Best Python code snippet using stestr_python
test_rabbit_consumer_producer.py
Source:test_rabbit_consumer_producer.py
1import json2import os3import shutil4import unittest5from functools import partial6from multiprocessing import Process, Pipe7from typing import Dict, List, Tuple8import pika9from tp2_utils.rabbit_utils.message_set.disk_message_set import DiskMessageSet10from tp2_utils.rabbit_utils.rabbit_consumer_producer import RabbitQueueConsumerProducer11CONSUME_QUEUE = "consume_example"12RESPONSE_QUEUE = "response_example"13class TestRabbitQueueConsumerProducer(unittest.TestCase):14 @staticmethod15 def consume_filter(message: Dict) -> Tuple[List[Dict], bool]:16 if message["type"] == "A":17 return [message], False18 return [], False19 @staticmethod20 def publish_multiple(message: Dict) -> Tuple[List[Dict], bool]:21 if isinstance(message["value"], int) or isinstance(message["value"], float):22 return [{"type": message["type"]}] * int(message["value"]), False23 return [], False24 @staticmethod25 def republish_and_stop_with_key_z(message: Dict) -> Tuple[List[Dict], bool]:26 if message['key'] != 'Z':27 return [message], False28 else:29 return [message], True30 def _start_process(self, func, messages_to_group=1, idempotency_set=None):31 RabbitQueueConsumerProducer("localhost", CONSUME_QUEUE, [RESPONSE_QUEUE], func,32 messages_to_group=messages_to_group,33 idempotency_set=idempotency_set)()34 @staticmethod35 def _read_process(write_pipe: Pipe):36 def consume(write_pipe, ch, method, properties, body):37 write_pipe.send(body)38 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))39 channel = connection.channel()40 channel.basic_consume(queue=RESPONSE_QUEUE,41 on_message_callback=partial(consume, write_pipe),42 auto_ack=True)43 channel.start_consuming()44 def setUp(self) -> None:45 try:46 from pytest_cov.embed import cleanup_on_sigterm47 except ImportError:48 pass49 else:50 cleanup_on_sigterm()51 shutil.rmtree('/tmp/message_set', ignore_errors=True)52 os.mkdir('/tmp/message_set')53 self.message_set = DiskMessageSet('/tmp/message_set')54 self.recv_pipe, self.write_pipe = Pipe(False)55 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))56 self.channel = self.connection.channel()57 self.channel.queue_declare(queue=CONSUME_QUEUE)58 self.channel.queue_declare(queue=RESPONSE_QUEUE)59 self.channel.queue_purge(CONSUME_QUEUE)60 self.channel.queue_purge(RESPONSE_QUEUE)61 self.test_process = None62 self.consume_process = Process(target=self._read_process, args=(self.write_pipe,))63 self.consume_process.start()64 def tearDown(self) -> None:65 self.channel.queue_purge(CONSUME_QUEUE)66 self.channel.queue_purge(RESPONSE_QUEUE)67 if self.test_process:68 self.test_process.terminate()69 self.consume_process.terminate()70 shutil.rmtree('/tmp/message_set', ignore_errors=True)71 def test_simple_filter(self):72 self.test_process = Process(target=self._start_process, args=(self.consume_filter,))73 self.test_process.start()74 self.channel.queue_declare(queue=CONSUME_QUEUE)75 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,76 body=json.dumps({"type": "A", "value": 4.2}))77 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,78 body=json.dumps({"type": "B", "value": 5}))79 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,80 body=json.dumps({"type": "C", "value": "a"}))81 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,82 body=json.dumps({"type": "D", "value": 4}))83 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,84 body=json.dumps({"type": "A", "value": 2.2}))85 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,86 body=json.dumps({"type": "A", "value": 4.1}))87 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,88 body=json.dumps([{"type": "A", "value": None},89 {"type": "V", "value": None}]))90 processed_data = []91 for _ in range(4):92 processed_data.append(json.loads(self.recv_pipe.recv()))93 self.assertFalse(self.recv_pipe.poll(1))94 self.assertEqual(processed_data[0], {"type": "A", "value": 4.2})95 self.assertEqual(processed_data[1], {"type": "A", "value": 2.2})96 self.assertEqual(processed_data[2], {"type": "A", "value": 4.1})97 self.assertEqual(processed_data[3], {"type": "A", "value": None})98 def test_simple_filter_with_grouping(self):99 self.test_process = Process(target=self._start_process, args=(self.consume_filter, 2))100 self.test_process.start()101 self.channel.queue_declare(queue=CONSUME_QUEUE)102 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,103 body=json.dumps({"type": "A", "value": 4.2}))104 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,105 body=json.dumps({"type": "B", "value": 85}))106 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,107 body=json.dumps({"type": "C", "value": "a"}))108 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,109 body=json.dumps({"type": "D", "value": 4}))110 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,111 body=json.dumps({"type": "A", "value": 2.2}))112 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,113 body=json.dumps({"type": "A", "value": 4.1}))114 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,115 body=json.dumps([{"type": "A", "value": None},116 {"type": "V", "value": None}]))117 processed_data = []118 for _ in range(4):119 processed_data.append(json.loads(self.recv_pipe.recv()))120 self.assertFalse(self.recv_pipe.poll(1))121 self.assertEqual(processed_data[0], [{"type": "A", "value": 4.2}])122 self.assertEqual(processed_data[1], [{"type": "A", "value": 2.2}])123 self.assertEqual(processed_data[2], [{"type": "A", "value": 4.1}])124 self.assertEqual(processed_data[3], [{"type": "A", "value": None}])125 def test_simple_multipy_message_with_grouping(self):126 self.test_process = Process(target=self._start_process, args=(self.publish_multiple, 2))127 self.test_process.start()128 self.channel.queue_declare(queue=CONSUME_QUEUE)129 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,130 body=json.dumps({"type": "A", "value": 4.2}))131 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,132 body=json.dumps({"type": "B", "value": 7}))133 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,134 body=json.dumps({"type": "C", "value": "a"}))135 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,136 body=json.dumps({"type": "D", "value": 1}))137 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,138 body=json.dumps([{"type": "A", "value": None},139 {"type": "V", "value": None}]))140 processed_data = []141 for _ in range(7):142 processed_data.append(json.loads(self.recv_pipe.recv()))143 self.assertFalse(self.recv_pipe.poll(1))144 self.assertEqual(processed_data[0], [{"type": "A"}, {"type": "A"}])145 self.assertEqual(processed_data[1], [{"type": "A"}, {"type": "A"}])146 self.assertEqual(processed_data[2], [{"type": "B"}, {"type": "B"}])147 self.assertEqual(processed_data[3], [{"type": "B"}, {"type": "B"}])148 self.assertEqual(processed_data[4], [{"type": "B"}, {"type": "B"}])149 self.assertEqual(processed_data[5], [{"type": "B"}])150 self.assertEqual(processed_data[6], [{"type": "D"}])151 def test_idempotency_set_integration(self):152 self.test_process = Process(target=self._start_process,153 args=(self.publish_multiple, 2, self.message_set))154 self.test_process.start()155 self.channel.queue_declare(queue=CONSUME_QUEUE)156 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,157 body=json.dumps({"type": "A", "value": 4.2}))158 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,159 body=json.dumps({"type": "B", "value": 7}))160 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,161 body=json.dumps({"type": "C", "value": "a"}))162 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,163 body=json.dumps({"type": "D", "value": 1}))164 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,165 body=json.dumps([{"type": "A", "value": 4.2},166 {"type": "V", "value": 1}]))167 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,168 body=json.dumps({"type": "D", "value": 1}))169 processed_data = []170 for _ in range(8):171 processed_data.append(json.loads(self.recv_pipe.recv()))172 self.assertFalse(self.recv_pipe.poll(1))173 self.assertEqual(processed_data[0], [{"type": "A"}, {"type": "A"}])174 self.assertEqual(processed_data[1], [{"type": "A"}, {"type": "A"}])175 self.assertEqual(processed_data[2], [{"type": "B"}, {"type": "B"}])176 self.assertEqual(processed_data[3], [{"type": "B"}, {"type": "B"}])177 self.assertEqual(processed_data[4], [{"type": "B"}, {"type": "B"}])178 self.assertEqual(processed_data[5], [{"type": "B"}])179 self.assertEqual(processed_data[6], [{"type": "D"}])180 self.assertEqual(processed_data[7], [{"type": "V"}])181 def test_simple_stop(self):182 self.test_process = Process(target=self._start_process,183 args=(self.republish_and_stop_with_key_z, 2, self.message_set))184 self.test_process.start()185 self.channel.queue_declare(queue=CONSUME_QUEUE)186 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,187 body=json.dumps({"key": "A", "value": 4.2}))188 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,189 body=json.dumps({"key": "Z", "value": 7}))190 self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,191 body=json.dumps({"key": "C", "value": "a"}))192 self.test_process.join()193 processed_data = []194 for _ in range(2):195 processed_data.append(json.loads(self.recv_pipe.recv()))196 self.assertFalse(self.recv_pipe.poll(1))197 self.assertEqual(processed_data[0], [{"key": "A", "value": 4.2}])...
ch15-ch16-serverpool-and-queue.py
Source:ch15-ch16-serverpool-and-queue.py
...64 return random.gauss( 1200, 5 )65 if global_time > 2200:66 return random.gauss( 800, 5 )67 return random.gauss( 1000, 5 )68def consume_queue():69 a, b = 20, 270 return 100*random.betavariate( a, b ) # mean: a/(a+b); var: ~b/a^271# ============================================================72# Server Pool73def statictest( traffic ):74 def loadqueue():75 return random.gauss( traffic, traffic/200 )76 77 fb.static_test( ServerPool, ( 0, consume_queue, loadqueue ),78 20, 20, 5, 1000 ) # max u, steps, trials, timesteps79def closedloop1():80 # Closed loop, setpoint 0.6-0.8, PID Controller81 82 def loadqueue():...
rabbitmq.py
Source:rabbitmq.py
1import pika2from os import environ3from dotenv import load_dotenv4from utils.log_utils import log5load_dotenv()6class RabbitmqService:7 def __init__(self, consume_queue='', post_routing_key='', post_exchange='', post_queue=''):8 self.consume_queue = consume_queue9 self.post_routing_key = post_routing_key10 self.post_exchange = post_exchange11 self.post_queue = post_queue12 self.log = log13 self._create_connection()14 if consume_queue:15 self.channel.queue_declare(queue=consume_queue)16 if post_queue:17 self.channel.queue_declare(queue=post_queue)18 if post_exchange:19 self.channel.queue_declare(queue=post_queue)20 self.channel.exchange_declare(exchange=post_exchange)21 self.channel.queue_bind(post_queue, post_exchange, post_routing_key)22 def _create_connection(self):23 self.connection = pika.BlockingConnection(24 pika.ConnectionParameters(environ.get('RABBITMQ_URL'), 25 environ.get('RABBITMQ_PORT'), 26 virtual_host=environ.get('RABBITMQ_VHOST')))27 self.channel = self.connection.channel()28 def consume(self, callback):29 while True:30 try:31 self.channel.basic_qos(prefetch_count=1)32 self.channel.basic_consume(queue=self.consume_queue,33 on_message_callback=callback,34 auto_ack=True35 )36 return self.channel.start_consuming()37 except pika.exceptions.ConnectionClosedByBroker:38 break39 except pika.exceptions.AMQPChannelError:40 break41 except pika.exceptions.AMQPConnectionError:42 continue43 def post(self, message):44 self.channel.basic_publish(exchange=self.post_exchange,45 routing_key=self.post_routing_key,46 body=message)47 self.log.info("Sent validation response [%s]" % message)48 def close_connection(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!