Best Python code snippet using tempest_python
backend.py
Source:backend.py  
...23        self.conn = establish_test_connection()24        self.queue = TEST_QUEUE25        self.exchange = TEST_EXCHANGE26        self.routing_key = TEST_ROUTING_KEY27    def create_consumer(self, **options):28        queue = "%s%s" % (self.queue, self.nextq())29        return Consumer(connection=self.conn,30                        queue=queue, exchange=self.exchange,31                        routing_key=self.routing_key, **options)32    def create_consumerset(self, queues={}, consumers=[], **options):33        return ConsumerSet(connection=self.conn,34                           from_dict=queues, consumers=consumers, **options)35    def create_publisher(self, exchange=None, routing_key=None, **options):36        exchange = exchange or self.exchange37        routing_key = routing_key or self.routing_key38        return Publisher(connection=self.conn,39                        exchange=exchange, routing_key=routing_key,40                        **options)41    def test_regression_implied_auto_delete(self):42        consumer = self.create_consumer(exclusive=True, auto_declare=False)43        self.assertTrue(consumer.auto_delete, "exclusive implies auto_delete")44        consumer.close()45        consumer = self.create_consumer(durable=True, auto_delete=False,46                                        auto_declare=False)47        self.assertFalse(consumer.auto_delete,48            """durable does *not* imply auto_delete.49            regression: http://github.com/ask/carrot/issues/closed#issue/2""")50        consumer.close()51    def test_consumer_options(self):52        opposite_defaults = {53                "queue": "xyxyxyxy",54                "exchange": "xyxyxyxy",55                "routing_key": "xyxyxyxy",56                "durable": False,57                "exclusive": True,58                "auto_delete": True,59                "exchange_type": "topic",60        }61        consumer = Consumer(connection=self.conn, **opposite_defaults)62        for opt_name, opt_value in opposite_defaults.items():63            self.assertEquals(getattr(consumer, opt_name), opt_value)64        consumer.close()65    def test_consumer_backend(self):66        consumer = self.create_consumer()67        self.assertTrue(consumer.backend.connection is self.conn)68        consumer.close()69    def test_consumer_queue_declared(self):70        consumer = self.create_consumer()71        self.assertTrue(consumer.backend.queue_exists(consumer.queue))72        consumer.close()73    def test_consumer_callbacks(self):74        consumer = self.create_consumer()75        publisher = self.create_publisher()76        # raises on no callbacks77        self.assertRaises(NotImplementedError, consumer.receive, {}, {})78        callback1_scratchpad = {}79        def callback1(message_data, message):80            callback1_scratchpad["message_data"] = message_data81        callback2_scratchpad = {}82        def callback2(message_data, message):83            callback2_scratchpad.update({"delivery_tag": message.delivery_tag,84                                         "message_body": message.body})85        self.assertFalse(consumer.callbacks, "no default callbacks")86        consumer.register_callback(callback1)87        consumer.register_callback(callback2)88        self.assertEquals(len(consumer.callbacks), 2, "callbacks registered")89        self.assertTrue(consumer.callbacks[0] is callback1,90                "callbacks are ordered")91        self.assertTrue(consumer.callbacks[1] is callback2,92                "callbacks are ordered")93        body = {"foo": "bar"}94        message = self.create_raw_message(publisher, body, "Elaine was here")95        consumer._receive_callback(message)96        self.assertEquals(callback1_scratchpad.get("message_data"), body,97                "callback1 was called")98        self.assertEquals(callback2_scratchpad.get("delivery_tag"),99                "Elaine was here")100        consumer.close()101        publisher.close()102    def create_raw_message(self, publisher, body, delivery_tag):103        raw_message = publisher.create_message(body)104        raw_message.delivery_tag = delivery_tag105        return raw_message106    def test_empty_queue_returns_None(self):107        consumer = self.create_consumer()108        consumer.discard_all()109        self.assertFalse(consumer.fetch())110        consumer.close()111    def test_custom_serialization_scheme(self):112        serialization.registry.register('custom_test',113                pickle.dumps, pickle.loads,114                content_type='application/x-custom-test',115                content_encoding='binary')116        consumer = self.create_consumer()117        publisher = self.create_publisher()118        consumer.discard_all()119        data = {"string": "The quick brown fox jumps over the lazy dog",120                "int": 10,121                "float": 3.14159265,122                "unicode": u"The quick brown fox jumps over the lazy dog",123                "advanced": AdvancedDataType("something"),124                "set": set(["george", "jerry", "elaine", "cosmo"]),125                "exception": Exception("There was an error"),126        }127        publisher.send(data, serializer='custom_test')128        message = fetch_next_message(consumer)129        backend = self.conn.create_backend()130        self.assertTrue(isinstance(message, backend.Message))131        self.assertEquals(message.payload.get("int"), 10)132        self.assertEquals(message.content_type, 'application/x-custom-test')133        self.assertEquals(message.content_encoding, 'binary')134        decoded_data = message.decode()135        self.assertEquals(decoded_data.get("string"),136                "The quick brown fox jumps over the lazy dog")137        self.assertEquals(decoded_data.get("int"), 10)138        self.assertEquals(decoded_data.get("float"), 3.14159265)139        self.assertEquals(decoded_data.get("unicode"),140                u"The quick brown fox jumps over the lazy dog")141        self.assertEquals(decoded_data.get("set"),142            set(["george", "jerry", "elaine", "cosmo"]))143        self.assertTrue(isinstance(decoded_data.get("exception"), Exception))144        self.assertEquals(decoded_data.get("exception").args[0],145            "There was an error")146        self.assertTrue(isinstance(decoded_data.get("advanced"),147            AdvancedDataType))148        self.assertEquals(decoded_data["advanced"].data, "something")149        consumer.close()150        publisher.close()151    def test_consumer_fetch(self):152        consumer = self.create_consumer()153        publisher = self.create_publisher()154        consumer.discard_all()155        data = {"string": "The quick brown fox jumps over the lazy dog",156                "int": 10,157                "float": 3.14159265,158                "unicode": u"The quick brown fox jumps over the lazy dog",159        }160        publisher.send(data)161        message = fetch_next_message(consumer)162        backend = self.conn.create_backend()163        self.assertTrue(isinstance(message, backend.Message))164        self.assertEquals(message.decode(), data)165        consumer.close()166        publisher.close()167    def test_consumer_process_next(self):168        consumer = self.create_consumer()169        publisher = self.create_publisher()170        consumer.discard_all()171        scratchpad = {}172        def callback(message_data, message):173            scratchpad["delivery_tag"] = message.delivery_tag174        consumer.register_callback(callback)175        publisher.send({"name_discovered": {176                            "first_name": "Cosmo",177                            "last_name": "Kramer"}})178        while True:179            message = consumer.fetch(enable_callbacks=True)180            if message:181                break182        self.assertEquals(scratchpad.get("delivery_tag"),183                message.delivery_tag)184        consumer.close()185        publisher.close()186    def test_consumer_discard_all(self):187        consumer = self.create_consumer()188        publisher = self.create_publisher()189        consumer.discard_all()190        for i in xrange(100):191            publisher.send({"foo": "bar"})192        time.sleep(0.5)193        self.assertEquals(consumer.discard_all(), 100)194        consumer.close()195        publisher.close()196    def test_iterqueue(self):197        consumer = self.create_consumer()198        publisher = self.create_publisher()199        num = consumer.discard_all()200        it = consumer.iterqueue(limit=100)201        consumer.register_callback(lambda *args: args)202        for i in xrange(100):203            publisher.send({"foo%d" % i: "bar%d" % i})204        time.sleep(0.5)205        for i in xrange(100):206            try:207                message = it.next()208                data = message.decode()209                self.assertTrue("foo%d" % i in data, "foo%d not in data" % i)210                self.assertEquals(data.get("foo%d" % i), "bar%d" % i)211            except StopIteration:212                self.assertTrue(False, "iterqueue fails StopIteration")213        self.assertRaises(StopIteration, it.next)214        # no messages on queue raises StopIteration if infinite=False215        it = consumer.iterqueue()216        self.assertRaises(StopIteration, it.next)217        it = consumer.iterqueue(infinite=True)218        self.assertTrue(it.next() is None,219                "returns None if no messages and inifite=True")220        consumer.close()221        publisher.close()222    def test_publisher_message_priority(self):223        consumer = self.create_consumer()224        publisher = self.create_publisher()225        consumer.discard_all()226        m = publisher.create_message("foo", priority=9)227        publisher.send({"foo": "bar"}, routing_key="nowhere", priority=9,228                mandatory=False, immediate=False)229        consumer.discard_all()230        consumer.close()231        publisher.close()232    def test_backend_survives_channel_close_regr17(self):233        """234        test that a backend instance is still functional after235        a method that results in a channel closure.236        """237        backend = self.create_publisher().backend238        assert not backend.queue_exists('notaqueue')239        # after calling this once, the channel seems to close, but the240        # backend may be holding a reference to it...241        assert not backend.queue_exists('notaqueue')242    def disabled_publisher_mandatory_flag_regr16(self):243        """244        Test that the publisher "mandatory" flag245        raises exceptions at appropriate times.246        """247        routing_key = 'black_hole'248        assert self.conn.connection is not None249        message = {'foo': 'mandatory'}250        # sanity check cleanup from last test251        assert not self.create_consumer().backend.queue_exists(routing_key)252        publisher = self.create_publisher()253        # this should just get discarded silently, it's not mandatory254        publisher.send(message, routing_key=routing_key, mandatory=False)255        # This raises an unspecified exception because there is no queue to256        # deliver to257        self.assertRaises(Exception, publisher.send, message,258                          routing_key=routing_key, mandatory=True)259        # now bind a queue to it260        consumer = Consumer(connection=self.conn,261                            queue=routing_key, exchange=self.exchange,262                            routing_key=routing_key, durable=False,263                            exclusive=True)264        # check that it exists265        assert self.create_consumer().backend.queue_exists(routing_key)266        # this should now get routed to our consumer with no exception267        publisher.send(message, routing_key=routing_key, mandatory=True)268    def test_consumer_auto_ack(self):269        consumer = self.create_consumer(auto_ack=True)270        publisher = self.create_publisher()271        consumer.discard_all()272        publisher.send({"foo": "Baz"})273        message = fetch_next_message(consumer)274        self.assertEquals(message._state, "ACK")275        consumer.close()276        publisher.close()277        publisher = self.create_publisher()278        consumer = self.create_consumer(auto_ack=False)279        publisher.send({"foo": "Baz"})280        message = fetch_next_message(consumer)281        self.assertEquals(message._state, "RECEIVED")282        consumer.close()283        publisher.close()284    def test_consumer_consume(self):285        consumer = self.create_consumer(auto_ack=True)286        publisher = self.create_publisher()287        consumer.discard_all()288        data = {"foo": "Baz"}289        publisher.send(data)290        try:291            data2 = {"company": "Vandelay Industries"}292            publisher.send(data2)293            scratchpad = {}294            def callback(message_data, message):295                scratchpad["data"] = message_data296            consumer.register_callback(callback)297            it = consumer.iterconsume()298            it.next()299            self.assertEquals(scratchpad.get("data"), data)300            it.next()301            self.assertEquals(scratchpad.get("data"), data2)302            # Cancel consumer/close and restart.303            consumer.close()304            consumer = self.create_consumer(auto_ack=True)305            consumer.register_callback(callback)306            consumer.discard_all()307            scratchpad = {}308            # Test limits309            it = consumer.iterconsume(limit=4)310            publisher.send(data)311            publisher.send(data2)312            publisher.send(data)313            publisher.send(data2)314            publisher.send(data)315            it.next()316            self.assertEquals(scratchpad.get("data"), data)317            it.next()318            self.assertEquals(scratchpad.get("data"), data2)...service_templates.py
Source:service_templates.py  
...19from ..helpers import (get_example_uri, get_service_template_uri)20def consume_literal(literal, consumer_class_name='instance', cache=True, no_issues=True):21    cachedmethod.ENABLED = cache22    context = create_context(LiteralLocation(literal))23    consumer, dumper = create_consumer(context, consumer_class_name)24    consumer.consume()25    if no_issues:26        context.validation.dump_issues()27        assert not context.validation.has_issues28    return context, dumper29def consume_use_case(use_case_name, consumer_class_name='instance', cache=True):30    cachedmethod.ENABLED = cache31    uri = get_example_uri('tosca-simple-1.0', 'use-cases', use_case_name,32                          '{0}.yaml'.format(use_case_name))33    context = create_context(uri)34    inputs_file = get_example_uri('tosca-simple-1.0', 'use-cases', use_case_name, 'inputs.yaml')35    if os.path.isfile(inputs_file):36        context.args.append('--inputs={0}'.format(inputs_file))37    consumer, dumper = create_consumer(context, consumer_class_name)38    consumer.consume()39    context.validation.dump_issues()40    assert not context.validation.has_issues41    return context, dumper42def consume_types_use_case(use_case_name, consumer_class_name='instance', cache=True):43    cachedmethod.ENABLED = cache44    uri = get_service_template_uri('tosca-simple-1.0', 'types', use_case_name,45                                   '{0}.yaml'.format(use_case_name))46    context = create_context(uri)47    inputs_file = get_example_uri('tosca-simple-1.0', 'types', use_case_name, 'inputs.yaml')48    if os.path.isfile(inputs_file):49        context.args.append('--inputs={0}'.format(inputs_file))50    consumer, dumper = create_consumer(context, consumer_class_name)51    consumer.consume()52    context.validation.dump_issues()53    assert not context.validation.has_issues54    return context, dumper55def consume_node_cellar(consumer_class_name='instance', cache=True):56    consume_test_case(57        get_service_template_uri('tosca-simple-1.0', 'node-cellar', 'node-cellar.yaml'),58        consumer_class_name=consumer_class_name,59        inputs_uri=get_service_template_uri('tosca-simple-1.0', 'node-cellar', 'inputs.yaml'),60        cache=cache61    )62def consume_test_case(uri, inputs_uri=None, consumer_class_name='instance', cache=True):63    cachedmethod.ENABLED = cache64    uri = get_service_template_uri(uri)65    context = create_context(uri)66    if inputs_uri:67        context.args.append('--inputs=' + get_service_template_uri(inputs_uri))68    consumer, dumper = create_consumer(context, consumer_class_name)69    consumer.consume()70    context.validation.dump_issues()71    assert not context.validation.has_issues...consumerController.py
Source:consumerController.py  
2import simplejson as json3class ConsumerController:4    def consume_tweets(self):5        kafka = Kafka()6        customer = kafka.create_consumer(topic="sentiment_topic", group_id="test")7        while True:8            try:9                msg = customer.poll()10                if msg is None:11                    continue12                if msg.error():13                    print("Consumer customer error: {}".format(msg.error()))14                    continue15                json_data = json.loads(msg.value().decode("utf-8"))16                print(json_data)17            except Exception as err:18                print(f"something went wrong {err}")19    def consume_tweets_agg(self):20        kafka = Kafka()21        customer = kafka.create_consumer(topic="sentiment_agg_topic", group_id="test")22        while True:23            try:24                msg = customer.poll()25                if msg is None:26                    continue27                if msg.error():28                    print("Consumer customer error: {}".format(msg.error()))29                    continue30                json_data = json.loads(msg.value().decode("utf-8"))31                print(json_data)32            except Exception as err:33                print(f"something went wrong {err}")34    def consume_tweets_word_agg(self):35        kafka = Kafka()36        customer = kafka.create_consumer(topic="sentiment_word_agg_topic", group_id="test")37        while True:38            try:39                msg = customer.poll()40                if msg is None:41                    continue42                if msg.error():43                    print("Consumer customer error: {}".format(msg.error()))44                    continue45                json_data = json.loads(msg.value().decode("utf-8"))46                print(json_data)47            except Exception as err:48                print(f"something went wrong {err}")49    def consume_tweets_agg_groupby(self):50        kafka = Kafka()51        customer = kafka.create_consumer(topic="sentiment_agg_groupby_topic", group_id="test")52        while True:53            try:54                msg = customer.poll()55                if msg is None:56                    continue57                if msg.error():58                    print("Consumer customer error: {}".format(msg.error()))59                    continue60                json_data = json.loads(msg.value().decode("utf-8"))61                print(json_data)62            except Exception as err:63                print(f"something went wrong {err}")64    def consume_tweets_agg_word2(self):65        kafka = Kafka()66        customer = kafka.create_consumer(topic="sentiment_word_agg_topic2", group_id="test")67        while True:68            try:69                msg = customer.poll()70                if msg is None:71                    continue72                if msg.error():73                    print("Consumer customer error: {}".format(msg.error()))74                    continue75                json_data = json.loads(msg.value().decode("utf-8"))76                print(json_data)77            except Exception as err:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
