Best Python code snippet using stestr_python
test_rabbit_consumer_producer.py
Source:test_rabbit_consumer_producer.py  
1import json2import os3import shutil4import unittest5from functools import partial6from multiprocessing import Process, Pipe7from typing import Dict, List, Tuple8import pika9from tp2_utils.rabbit_utils.message_set.disk_message_set import DiskMessageSet10from tp2_utils.rabbit_utils.rabbit_consumer_producer import RabbitQueueConsumerProducer11CONSUME_QUEUE = "consume_example"12RESPONSE_QUEUE = "response_example"13class TestRabbitQueueConsumerProducer(unittest.TestCase):14    @staticmethod15    def consume_filter(message: Dict) -> Tuple[List[Dict], bool]:16        if message["type"] == "A":17            return [message], False18        return [], False19    @staticmethod20    def publish_multiple(message: Dict) -> Tuple[List[Dict], bool]:21        if isinstance(message["value"], int) or isinstance(message["value"], float):22            return [{"type": message["type"]}] * int(message["value"]), False23        return [], False24    @staticmethod25    def republish_and_stop_with_key_z(message: Dict) -> Tuple[List[Dict], bool]:26        if message['key'] != 'Z':27            return [message], False28        else:29            return [message], True30    def _start_process(self, func, messages_to_group=1, idempotency_set=None):31        RabbitQueueConsumerProducer("localhost", CONSUME_QUEUE, [RESPONSE_QUEUE], func,32                                    messages_to_group=messages_to_group,33                                    idempotency_set=idempotency_set)()34    @staticmethod35    def _read_process(write_pipe: Pipe):36        def consume(write_pipe, ch, method, properties, body):37            write_pipe.send(body)38        connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))39        channel = connection.channel()40        channel.basic_consume(queue=RESPONSE_QUEUE,41                              on_message_callback=partial(consume, write_pipe),42                              auto_ack=True)43        channel.start_consuming()44    def setUp(self) -> None:45        try:46            from pytest_cov.embed import cleanup_on_sigterm47        except ImportError:48            pass49        else:50            cleanup_on_sigterm()51        shutil.rmtree('/tmp/message_set', ignore_errors=True)52        os.mkdir('/tmp/message_set')53        self.message_set = DiskMessageSet('/tmp/message_set')54        self.recv_pipe, self.write_pipe = Pipe(False)55        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))56        self.channel = self.connection.channel()57        self.channel.queue_declare(queue=CONSUME_QUEUE)58        self.channel.queue_declare(queue=RESPONSE_QUEUE)59        self.channel.queue_purge(CONSUME_QUEUE)60        self.channel.queue_purge(RESPONSE_QUEUE)61        self.test_process = None62        self.consume_process = Process(target=self._read_process, args=(self.write_pipe,))63        self.consume_process.start()64    def tearDown(self) -> None:65        self.channel.queue_purge(CONSUME_QUEUE)66        self.channel.queue_purge(RESPONSE_QUEUE)67        if self.test_process:68            self.test_process.terminate()69        self.consume_process.terminate()70        shutil.rmtree('/tmp/message_set', ignore_errors=True)71    def test_simple_filter(self):72        self.test_process = Process(target=self._start_process, args=(self.consume_filter,))73        self.test_process.start()74        self.channel.queue_declare(queue=CONSUME_QUEUE)75        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,76                                   body=json.dumps({"type": "A", "value": 4.2}))77        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,78                                   body=json.dumps({"type": "B", "value": 5}))79        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,80                                   body=json.dumps({"type": "C", "value": "a"}))81        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,82                                   body=json.dumps({"type": "D", "value": 4}))83        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,84                                   body=json.dumps({"type": "A", "value": 2.2}))85        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,86                                   body=json.dumps({"type": "A", "value": 4.1}))87        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,88                                   body=json.dumps([{"type": "A", "value": None},89                                                    {"type": "V", "value": None}]))90        processed_data = []91        for _ in range(4):92            processed_data.append(json.loads(self.recv_pipe.recv()))93        self.assertFalse(self.recv_pipe.poll(1))94        self.assertEqual(processed_data[0], {"type": "A", "value": 4.2})95        self.assertEqual(processed_data[1], {"type": "A", "value": 2.2})96        self.assertEqual(processed_data[2], {"type": "A", "value": 4.1})97        self.assertEqual(processed_data[3], {"type": "A", "value": None})98    def test_simple_filter_with_grouping(self):99        self.test_process = Process(target=self._start_process, args=(self.consume_filter, 2))100        self.test_process.start()101        self.channel.queue_declare(queue=CONSUME_QUEUE)102        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,103                                   body=json.dumps({"type": "A", "value": 4.2}))104        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,105                                   body=json.dumps({"type": "B", "value": 85}))106        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,107                                   body=json.dumps({"type": "C", "value": "a"}))108        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,109                                   body=json.dumps({"type": "D", "value": 4}))110        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,111                                   body=json.dumps({"type": "A", "value": 2.2}))112        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,113                                   body=json.dumps({"type": "A", "value": 4.1}))114        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,115                                   body=json.dumps([{"type": "A", "value": None},116                                                    {"type": "V", "value": None}]))117        processed_data = []118        for _ in range(4):119            processed_data.append(json.loads(self.recv_pipe.recv()))120        self.assertFalse(self.recv_pipe.poll(1))121        self.assertEqual(processed_data[0], [{"type": "A", "value": 4.2}])122        self.assertEqual(processed_data[1], [{"type": "A", "value": 2.2}])123        self.assertEqual(processed_data[2], [{"type": "A", "value": 4.1}])124        self.assertEqual(processed_data[3], [{"type": "A", "value": None}])125    def test_simple_multipy_message_with_grouping(self):126        self.test_process = Process(target=self._start_process, args=(self.publish_multiple, 2))127        self.test_process.start()128        self.channel.queue_declare(queue=CONSUME_QUEUE)129        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,130                                   body=json.dumps({"type": "A", "value": 4.2}))131        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,132                                   body=json.dumps({"type": "B", "value": 7}))133        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,134                                   body=json.dumps({"type": "C", "value": "a"}))135        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,136                                   body=json.dumps({"type": "D", "value": 1}))137        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,138                                   body=json.dumps([{"type": "A", "value": None},139                                                    {"type": "V", "value": None}]))140        processed_data = []141        for _ in range(7):142            processed_data.append(json.loads(self.recv_pipe.recv()))143        self.assertFalse(self.recv_pipe.poll(1))144        self.assertEqual(processed_data[0], [{"type": "A"}, {"type": "A"}])145        self.assertEqual(processed_data[1], [{"type": "A"}, {"type": "A"}])146        self.assertEqual(processed_data[2], [{"type": "B"}, {"type": "B"}])147        self.assertEqual(processed_data[3], [{"type": "B"}, {"type": "B"}])148        self.assertEqual(processed_data[4], [{"type": "B"}, {"type": "B"}])149        self.assertEqual(processed_data[5], [{"type": "B"}])150        self.assertEqual(processed_data[6], [{"type": "D"}])151    def test_idempotency_set_integration(self):152        self.test_process = Process(target=self._start_process,153                                    args=(self.publish_multiple, 2, self.message_set))154        self.test_process.start()155        self.channel.queue_declare(queue=CONSUME_QUEUE)156        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,157                                   body=json.dumps({"type": "A", "value": 4.2}))158        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,159                                   body=json.dumps({"type": "B", "value": 7}))160        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,161                                   body=json.dumps({"type": "C", "value": "a"}))162        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,163                                   body=json.dumps({"type": "D", "value": 1}))164        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,165                                   body=json.dumps([{"type": "A", "value": 4.2},166                                                    {"type": "V", "value": 1}]))167        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,168                                   body=json.dumps({"type": "D", "value": 1}))169        processed_data = []170        for _ in range(8):171            processed_data.append(json.loads(self.recv_pipe.recv()))172        self.assertFalse(self.recv_pipe.poll(1))173        self.assertEqual(processed_data[0], [{"type": "A"}, {"type": "A"}])174        self.assertEqual(processed_data[1], [{"type": "A"}, {"type": "A"}])175        self.assertEqual(processed_data[2], [{"type": "B"}, {"type": "B"}])176        self.assertEqual(processed_data[3], [{"type": "B"}, {"type": "B"}])177        self.assertEqual(processed_data[4], [{"type": "B"}, {"type": "B"}])178        self.assertEqual(processed_data[5], [{"type": "B"}])179        self.assertEqual(processed_data[6], [{"type": "D"}])180        self.assertEqual(processed_data[7], [{"type": "V"}])181    def test_simple_stop(self):182        self.test_process = Process(target=self._start_process,183                                    args=(self.republish_and_stop_with_key_z, 2, self.message_set))184        self.test_process.start()185        self.channel.queue_declare(queue=CONSUME_QUEUE)186        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,187                                   body=json.dumps({"key": "A", "value": 4.2}))188        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,189                                   body=json.dumps({"key": "Z", "value": 7}))190        self.channel.basic_publish(exchange='', routing_key=CONSUME_QUEUE,191                                   body=json.dumps({"key": "C", "value": "a"}))192        self.test_process.join()193        processed_data = []194        for _ in range(2):195            processed_data.append(json.loads(self.recv_pipe.recv()))196        self.assertFalse(self.recv_pipe.poll(1))197        self.assertEqual(processed_data[0], [{"key": "A", "value": 4.2}])...ch15-ch16-serverpool-and-queue.py
Source:ch15-ch16-serverpool-and-queue.py  
...64        return random.gauss( 1200, 5 )65    if global_time > 2200:66        return random.gauss( 800, 5 )67    return random.gauss( 1000, 5 )68def consume_queue():69    a, b = 20, 270    return 100*random.betavariate( a, b ) # mean: a/(a+b); var: ~b/a^271# ============================================================72# Server Pool73def statictest( traffic ):74    def loadqueue():75        return random.gauss( traffic, traffic/200 )76    77    fb.static_test( ServerPool, ( 0, consume_queue, loadqueue ),78                    20, 20, 5, 1000 ) # max u, steps, trials, timesteps79def closedloop1():80    # Closed loop, setpoint 0.6-0.8, PID Controller81    82    def loadqueue():...rabbitmq.py
Source:rabbitmq.py  
1import pika2from os import environ3from dotenv import load_dotenv4from utils.log_utils import log5load_dotenv()6class RabbitmqService:7    def __init__(self, consume_queue='', post_routing_key='', post_exchange='', post_queue=''):8        self.consume_queue = consume_queue9        self.post_routing_key = post_routing_key10        self.post_exchange = post_exchange11        self.post_queue = post_queue12        self.log = log13        self._create_connection()14        if consume_queue:15            self.channel.queue_declare(queue=consume_queue)16        if post_queue:17            self.channel.queue_declare(queue=post_queue)18        if post_exchange:19            self.channel.queue_declare(queue=post_queue)20            self.channel.exchange_declare(exchange=post_exchange)21            self.channel.queue_bind(post_queue, post_exchange, post_routing_key)22    def _create_connection(self):23        self.connection = pika.BlockingConnection(24            pika.ConnectionParameters(environ.get('RABBITMQ_URL'), 25                                      environ.get('RABBITMQ_PORT'), 26                                      virtual_host=environ.get('RABBITMQ_VHOST')))27        self.channel = self.connection.channel()28    def consume(self, callback):29        while True:30            try:31                self.channel.basic_qos(prefetch_count=1)32                self.channel.basic_consume(queue=self.consume_queue,33                                        on_message_callback=callback,34                                        auto_ack=True35                                        )36                return self.channel.start_consuming()37            except pika.exceptions.ConnectionClosedByBroker:38                break39            except pika.exceptions.AMQPChannelError:40                break41            except pika.exceptions.AMQPConnectionError:42                continue43    def post(self, message):44        self.channel.basic_publish(exchange=self.post_exchange,45                                   routing_key=self.post_routing_key,46                                   body=message)47        self.log.info("Sent validation response [%s]" % message)48    def close_connection(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
