How to use merge_dictionary method in autotest

Best Python code snippet using autotest_python

rabbitMQ_manager.py

Source:rabbitMQ_manager.py Github

copy

Full Screen

1import json2import sys3import threading4import time5import uuid6from typing import List, Any, Callable, Optional7import pika as pika8# Number of hosts in the cluster9HOSTS = 310# Maximum time, in seconds, to wait when expecting to receive responses by the agents after11# sending a request12TIMEOUT = 413# list of topics to listen14topics_list = [15 "status_response",16 "containers_list_response",17 "config_response"18]19# dictionaries for aggregating the responses of the various hosts, when a request for all the hosts is sent20containers_list_responses = {}21status_responses = {}22config_responses = {}23# lock objects used to guarantee mutual exclusion when manipulating the dictionaries above. This is important since24# the dictionaries can be accessed from different functions that are called in an asynchronous way by the REST server.25containers_list_responses_lock = threading.Lock()26status_responses_lock = threading.Lock()27config_responses_lock = threading.Lock()28# ip address of the host running the rabbitMQ broker29rabbitMQ_broker_address = '172.16.3.170'30def broker_callback(channel, method, properties, body) -> None:31 """32 Callback method for all messages received.33 """34 # We decode the message and parse it from json format.35 if body is None:36 response = None37 else:38 response = json.loads(body.decode())39 # We check the topic of the message, on which the actions to be done depend.40 topic = method.routing_key41 if topic == "containers_list_response":42 # this topic means that the message is a response to a request for the complete list of43 # containers running on the cluster. This response comes from one of the hosts in the cluster.44 # The token in the response is the same that was previously sent in the request, so it is used to identify45 # the correct entry of the dictionary in which the response must be inserted, so it can be merged with other46 # responses to the same request.47 if "token" in response:48 token = response["token"]49 # the lock is used to ensure mutual exclusion while manipulating the containers_list_responses dictionary50 with containers_list_responses_lock:51 if "containers" in response and token in containers_list_responses:52 # the list of containers name is appended to the correct entry of the containers_list_responses53 # dictionary54 containers_list_responses[token].append(response["containers"])55 if topic == "status_response":56 # a message with this topic is received in two cases only: when the message is a response to a request57 # for the status of all the containers monitored on the cluster, and when it is a response to the request of58 # the status of a specific monitored container in the cluster. This response comes from one of the hosts in59 # the cluster.60 # The token in the response is the same that was previously sent in the request, so it is used to identify61 # the correct entry of the dictionary in which the response must be inserted, so it can be merged with other62 # eventual responses to the same request.63 if "token" in response:64 token = response["token"]65 # the lock is used to ensure mutual exclusion while manipulating the status_responses dictionary66 with status_responses_lock:67 if "containers" in response and token in status_responses:68 # if the response contains a "containers" field, then it is a response to a request69 # for the status of all the containers monitored on the cluster. We append the list of70 # dictionaries, each one with the data about one of the containers to the correct entry71 # of the status_responses dictionary72 values = response["containers"].values()73 values_list = list(values)74 status_responses[token].append(values_list)75 elif "container" in response and token in status_responses:76 # if the response contains a "container" field, then it is a response to the request of77 # the status of a specific monitored container in the cluster. We append the78 # dictionary, containing the data about one of the containers to the correct entry79 # of the status_responses dictionary80 status_responses[token].append(response["container"])81 if topic == "config_response":82 # this topic means that the message is a response to a request for the configuration83 # parameters on all the agents in the cluster. This response comes from one of the hosts in the cluster.84 # The token in the response is the same that was previously sent in the request, so it is used to identify85 # the correct entry of the dictionary in which the response must be inserted, so it can be merged with other86 # responses to the same request.87 if "token" in response:88 token = response["token"]89 # the lock is used to ensure mutual exclusion while manipulating the config_responses dictionary90 with config_responses_lock:91 if "config" in response and token in config_responses:92 # the dictionary, containing the current configuration of one agent,93 # is appended to the correct entry of the config_responses94 # dictionary95 config_responses[token].append(response["config"])96 print("Received command on topic " + method.routing_key + ", body: " + str(response), file=sys.stderr)97def initialize_communication(broker: str, topics: List[str], callback: Any = None) -> None:98 """99 Opens a connection with a rabbitMQ broker running on the specified host. Declares a queue100 and binds it to the provided topics. Sets the provided callback as the handler101 to call when a new message is received on the queue.102 :param broker: The ip address of the host running the rabbitMQ broker, as a string103 :param topics: The list of topics on which the queue must be bind104 :param callback: A method to call when a new message is received. It must have the form: handler(channel, method, properties, body) -> None105 """106 # We open the connection with the RabbitMQ broker and declare a107 # queue108 connection = pika.BlockingConnection(109 pika.ConnectionParameters(host=broker)) # broker ip address --> node manager110 channel = connection.channel()111 channel.exchange_declare(exchange='topics', exchange_type='topic')112 result = channel.queue_declare('')113 queue_name = result.method.queue114 # We bind the queue to all the topics provided as parameter115 for topic in topics:116 channel.queue_bind(exchange='topics', queue=queue_name, routing_key=topic)117 # We set the callback to be called for messages received on the queue.118 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)119 # We start listening on the queue in a new thread, because start_consuming() is120 # a blocking function121 threading.Thread(target=channel.start_consuming).start()122def send_message(broker: str, topic: str, body: Any) -> None:123 """124 Opens a connection with a rabbitMQ broker to send a message, then closes the connection.125 :param broker: the ip address of the broker126 :param topic: the topic related to the message127 :param body: the content of the message body128 """129 # We open the connection with a rabbitMQ broker.130 connection = pika.BlockingConnection(131 pika.ConnectionParameters(host=broker))132 channel = connection.channel()133 channel.exchange_declare(exchange="topics", exchange_type="topic")134 # We encode the message in json format (needed since we sometimes need to send complex135 # objects like dictionaries and lists). We then encode the message in bytes and send it with the topic136 # provided as parameter, closing the connection at the end.137 message = json.dumps(body).encode()138 channel.basic_publish(exchange="topics", routing_key=topic, body=message)139 connection.close()140def add_container(container_name: str, hostname: str) -> None:141 """142 Sends a message to a agent and requests that a container is added to the143 monitored containers.144 :param container_name: the name of the container to add145 :param hostname: the name of the host on which the container runs146 """147 # prepends the name of the agent's host to the topic, to use the topic of that specific148 # host and avoid sending the message to all agents.149 send_message(rabbitMQ_broker_address, hostname + "add_container", container_name)150def remove_container(container_name: str, hostname: str) -> None:151 """152 Sends a message to a agent and requests that a container is removed from the153 monitored containers.154 :param container_name: the name of the container to remove155 :param hostname: the name of the host on which the container runs156 """157 # prepends the name of the agent's host to the topic, to use the topic of that specific158 # host and avoid sending the message to all agents.159 send_message(rabbitMQ_broker_address, hostname + "remove_container", container_name)160def set_threshold(threshold: float) -> None:161 """162 Sends a message to all agents, setting a new value for their packet loss threshold163 parameter.164 :param threshold: the new value for the threshold parameter165 """166 send_message(rabbitMQ_broker_address, "set_threshold", str(threshold))167def set_ping_retries(ping_retries: int) -> None:168 """169 Sends a message to all agents, setting a new value for their ping_retries170 parameter. That is, the number of packets sent to a container when trying to171 reach it for diagnostic reasons.172 :param ping_retries: the new value for the ping_retries parameter173 """174 send_message(rabbitMQ_broker_address, "set_ping_retries", str(ping_retries))175def set_monitoring_period(period: int) -> None:176 """177 Sends a message to all agents, setting a new value for their monitoring period178 parameter.179 :param period: the new value for the monitoring period180 """181 send_message(rabbitMQ_broker_address, "set_monitoring_period", str(period))182def await_and_merge_responses(request_token: str,183 expected_responses: int,184 merge_dictionary: dict,185 merge_function: Callable[[Any], Any],186 lock: threading.Lock,187 timeout: int) -> Optional[list]:188 """189 Utility function that awaits for a certain number of responses to a190 specific request. Then it merges the received responses in an unique object191 using a custom method passed as parameter. It exploits a lock to manage the192 dictionary of responses in mutual exclusion.193 A timeout must be provided: when it elapses and the number of responses does not194 match the expected number, a partial result is returned.195 :param request_token: the request token, used to access the correct entry of the merge_dictionary196 :param expected_responses: the number of responses that the request should receive197 :param merge_dictionary: a dictionary in which the responses should eventually be found198 :param merge_function: a function that must be used to merge the different responses into a single response199 :param lock: the lock that must be used when accessing the merge_dictionary to avoid conflicts200 :param timeout: the maximum amount of time to wait for responses201 """202 # We save the starting time in a local variable, so we can check whether the timeout203 # is elapsed during execution.204 start = time.time()205 end = time.time()206 while True:207 # We periodically check if the merge_dictionary contains expected_responses values associated208 # with the request of interest. If this is true, we received all responses and we can proceed209 # with merging them and returning the result.210 with lock:211 if len(merge_dictionary[request_token]) == expected_responses:212 break213 # If it is not true, we check if the timeout is elapsed, in which case we leave the loop.214 # Otherwise we sleep for some milliseconds and continue with the loop.215 end = time.time()216 if (end - start) > timeout:217 break218 time.sleep(0.1)219 # Since we are outside the loop, we check the reason of exit. If the exit was caused by220 # the timeout, we print an error message.221 if (end - start) > timeout:222 print("Timeout elapsed on request " + request_token, file=sys.stderr)223 with lock:224 # If we don't find any response in the dictionary, we just delete the request's entry225 # and return None, since the timeout is surely elapsed in this case, and no host responded.226 if len(merge_dictionary[request_token]) == 0:227 del merge_dictionary[request_token]228 return None229 else:230 # If we find at least one response, we merge the responses and delete the request's entry231 # in the dictionary, then we return the result, partial or complete.232 result = merge_function(merge_dictionary[request_token])233 del merge_dictionary[request_token]234 return result235def merge_containers_status(lists: List[List[dict]]) -> List[dict]:236 """237 Utility function used to merge the responses to the containers status request into a238 single list.239 :param lists: the list of responses, each containing a list of dictionaries that represent the status of different containers240 """241 result = []242 for x in lists:243 result = result + x244 return result245def merge_container_status(responses: List[dict]) -> Optional[dict]:246 """247 Utility function used to obtain a single response to the container status request.248 :param responses: the list of responses, that should in any case only contain one response.249 """250 if len(responses) > 0:251 return responses[0]252 return None253def get_container_status(container_name=None, hostname=None) -> Any:254 """255 Sends a request for the status of one or all the containers, depending on the parameters.256 Then it awaits for all the expected responses and merges them into a single result that is257 returned.258 :param hostname: the name of the host on which the requested container runs. If this is left empty, a request for the status of all the containers will be sent.259 :param container_name: the name of the container of which the status is requested. This parameter is ignored if hostname is left empty.260 :return: a dictionary containing information about one container, if a container name is passed as parameter; a list of dictionary containing information about all the monitored containers otherwise261 """262 # We generate a random token for the request and initialize a entry in the status_responses263 # dictionary.264 request_uuid = str(uuid.uuid4())265 status_responses[request_uuid] = []266 if hostname is None:267 # if the hostname was not provided as parameter, we request the status of all the268 # containers, and we return the responses aggregated in a single result.269 send_message(rabbitMQ_broker_address, "all_containers_status", request_uuid)270 result = await_and_merge_responses(request_token=request_uuid,271 expected_responses=HOSTS,272 merge_dictionary=status_responses,273 merge_function=merge_containers_status,274 lock=status_responses_lock,275 timeout=TIMEOUT276 )277 return result278 else:279 # otherwise, if also a container name is present, we request the status of a single container280 # running on a host281 if container_name is None:282 return None283 # we insert the token in the request and send it to the specified host, prepending its name284 # to the topic. Then, we return the received response.285 request = {"token": request_uuid, "container": container_name}286 send_message(rabbitMQ_broker_address, hostname + "container_status", request)287 result = await_and_merge_responses(request_token=request_uuid,288 expected_responses=1,289 merge_dictionary=status_responses,290 merge_function=merge_container_status,291 lock=status_responses_lock,292 timeout=TIMEOUT293 )294 return result295def merge_containers_lists(lists: List[List[str]]) -> List[str]:296 """297 Utility function used to merge the responses to the containers list request into a298 single list.299 :param lists: the list of responses, each containing a list of container names300 """301 result = []302 for x in lists:303 result = result + x304 return result305def get_containers_list():306 """307 Sends a request for the list of all the names of containers running on all hosts of the cluster.308 Then it awaits for all the expected responses and merges them into a single list that is309 returned.310 """311 # We generate a random token for the request and initialize a entry in the status_responses312 # dictionary.313 request_uuid = str(uuid.uuid4())314 containers_list_responses[request_uuid] = []315 # we request the list of all the names of containers running in the cluster,316 # and we return the responses aggregated in a single result.317 send_message(rabbitMQ_broker_address, "container_list", request_uuid)318 result = await_and_merge_responses(request_token=request_uuid,319 expected_responses=HOSTS,320 merge_dictionary=containers_list_responses,321 merge_function=merge_containers_lists,322 lock=containers_list_responses_lock,323 timeout=TIMEOUT324 )325 return result326def merge_configurations(configurations: List[dict]) -> List[dict]:327 """328 Utility function used to merge the responses to the configuration request into a329 single list.330 :param configurations: the list of responses, each containing a dictionary representing the current configuration on a container.331 """332 # this just return the parameter as it is, the function only exists to match the pattern of the333 # await_and_merge_responses function defined above.334 return configurations335def get_configuration():336 """337 Sends a request for the current configuration of all active agents.338 Then it awaits for all the expected responses and merges them into a single list that is339 returned.340 """341 # We generate a random token for the request and initialize a entry in the status_responses342 # dictionary.343 request_uuid = str(uuid.uuid4())344 config_responses[request_uuid] = []345 # we request the configuration of all the agents in the cluster,346 # and we return the responses aggregated in a single result.347 send_message(rabbitMQ_broker_address, "config", request_uuid)348 result = await_and_merge_responses(request_token=request_uuid,349 expected_responses=HOSTS,350 merge_dictionary=config_responses,351 merge_function=merge_configurations,352 lock=config_responses_lock,353 timeout=TIMEOUT354 )355 return result356# we initialize the communication in order to listen for responses to our requests....

Full Screen

Full Screen

mergedictionary2.py

Source:mergedictionary2.py Github

copy

Full Screen

1#!/bin/python2def merge_dictionary(D1):3 D1 = D14 inner_list_dict = []; append_list = []5 for i,j in D1.items():6 if type(j) == dict:7 inner_list_dict.append(j)8 for i,j in inner_list_dict[0].items():9 if type(j) == list:10 append_list = append_list + j11 return append_list12def final_merge(D1,D2):13 D1_list = merge_dictionary(D1)14 D2_list = merge_dictionary(D2)15 merged_list = D1_list + D2_list16 D1.update(D2)17 D1['b']['d'] = merged_list18 return D119D1 = {'a':1,'b':{'c':3,'d':[4,5,6]}, 'f':7}20D2 = {'a':1,'c':9 ,'b':{'d':[8]}}21final_dict = final_merge(D1,D2)...

Full Screen

Full Screen

Q8.py

Source:Q8.py Github

copy

Full Screen

1d1 = {'a': 100, 'b': 200, 'c':300}2d2 = {'a': 300, 'b': 200, 'd':400}3def merge_dictionary(d1,d2):4 if len(d1) < len(d2): #for reducing time complexity,chose list with larger number of elements5 d1,d2 = d2,d16 new_dictionary = d1.copy() #makes copy of the larger dictionary7 for key,value in d2.items():8 if key in d1.keys():9 new_dictionary[key] = new_dictionary[key] + value10 else:11 new_dictionary[key] = value12 return new_dictionary13for key,value in merge_dictionary(d1,d2).items():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful