Best Python code snippet using locust
DouYu.py
Source:DouYu.py  
1# -*- coding: utf-8 -*-2# author:      YYT3# create_time: 2021/8/5  15:534import queue5import time6from threading import Thread7from lib.log.log_util import logging8from .payload import PayloadMSG9from .TCP_socket import TCPSocket10# æ¥å
¥æ£æ¥æ°æ®11class DouYu(object):12    def __init__(self, room_id,13                 heartbeat_interval,14                 barrage_host,15                 barrage_port):16        self.room_id = room_id17        self.heartbeat_interval = heartbeat_interval18        self.barrage_host = barrage_host19        self.barrage_port = barrage_port20        self.tcp_socket = TCPSocket(self.barrage_host, self.barrage_port)21        self.message_worker = MessageWorker(self.tcp_socket, self.room_id)22        self.heartbeat_worker = HeartbeatWorker(self.tcp_socket, self.heartbeat_interval)23    def add_handler(self, msg_type, handler):24        self.message_worker.add_handler(msg_type, handler)25    def set_heartbeat_interval(self, heartbeat_interval):26        self.heartbeat_interval = heartbeat_interval27    def refresh_object(self):28        self.tcp_socket = TCPSocket(self.barrage_host, self.barrage_port)29        self.message_worker = MessageWorker(self.tcp_socket, self.room_id)30        self.heartbeat_worker = HeartbeatWorker(self.tcp_socket, self.heartbeat_interval)31    def set_room_id(self, room_id):32        self.room_id = room_id33    def start(self):34        self.tcp_socket.connect()35        self.message_worker.start()36        self.heartbeat_worker.start()37    def stop(self):38        self.message_worker.set_stop()39        self.heartbeat_worker.set_stop()40        self.tcp_socket.close()41        self.refresh_object()42class HeartbeatWorker(Thread):43    def __init__(self, sock, heartbeat_interval=45):44        Thread.__init__(self)45        self.need_stop = False46        self.socket = sock47        self.heartbeat_interval = heartbeat_interval48    def set_stop(self, need_stop=True):49        self.need_stop = need_stop50    def run(self):51        while not self.need_stop:52            data = PayloadMSG.assemble_heartbeat_str()53            self.socket.send(data)54            time.sleep(self.heartbeat_interval)55class MessageConsumer(Thread):56    def __init__(self, msg_queue):57        Thread.__init__(self)58        self.need_stop = False59        self.msg_queue = msg_queue60        self.handlers = {}61    def add_handler(self, msg_type, handler):62        if msg_type not in self.handlers:63            self.handlers[msg_type] = []64        self.handlers[msg_type].append(handler)65    def set_stop(self, need_stop=True):66        self.need_stop = need_stop67    def run(self):68        while not self.need_stop:69            data = self.msg_queue.get()70            ori_str = PayloadMSG.extract_str_from_data(data)71            msg = PayloadMSG.parse_str_to_dict(ori_str)72            try:73                msg_type = msg['type']74                if msg_type in self.handlers:75                    for handler in self.handlers[msg_type]:76                        handler(msg, msg_type)77            except Exception as e:78                logging.warning("Invalid msg received. Exception: %s"79                                % e)80            self.msg_queue.task_done()81class MessageWorker(Thread):82    def __init__(self, sock, room_id):83        Thread.__init__(self)84        self.need_stop = False85        self.socket = sock86        self.room_id = room_id87        self.msg_queue = queue.Queue()88        self.message_consumer = MessageConsumer(self.msg_queue)89    def add_handler(self, msg_type, handler):90        self.message_consumer.add_handler(msg_type, handler)91    def set_stop(self, need_stop=True):92        self.need_stop = need_stop93        self.message_consumer.set_stop(need_stop)94    def prepare(self):95        self.message_consumer.start()96        self.enter_room()97    def enter_room(self):98        data = PayloadMSG.assemble_login_str(self.room_id)99        self.socket.send(data)100        logging.info("login is successful")101        data = PayloadMSG.assemble_join_group_str(self.room_id)102        self.socket.send(data)103        logging.info("group is successful")104    def run(self):105        self.prepare()106        while not self.need_stop:107            packet_size = self.socket.receive(4)108            if packet_size is None:109                logging.warning("Socket closed")110                self.socket.connect()111                self.enter_room()112                continue113            packet_size = int.from_bytes(packet_size, byteorder='little')114            data = self.socket.receive(packet_size)115            if data is None:116                logging.warning("Socket closed")117                self.socket.connect()118                self.enter_room()119                continue...libmango.py
Source:libmango.py  
...27            self.poller.register(s,zmq.POLLIN)28            self.hb_server = self.context.socket(zmq.ROUTER)29            self.hb_server.bind("inproc://heartbeat")30            self.add_socket(self.hb_server, self.heartbeat_handler, self.heartbeat_handler)31    def heartbeat_worker(self, context):32        s = context.socket(zmq.DEALER)33        s.connect("inproc://heartbeat")34        while True:35            time.sleep(30)36            s.send_string("heartbeat")37            38    def heartbeat_handler(self):39        self.hb_server.recv()40        self.hb_server.recv()41        self.m_send("alive",{})42            43    def add_socket(self, sock, recv_cb, err_cb):44        dataflow = lambda: None45        dataflow.recv = recv_cb...client.py
Source:client.py  
1from .message_worker import MessageWorker2from .heartbeat_worker import HeartbeatWorker3from .tcp_socket import TCPSocket4class Client(object):5    def __init__(self, room_id=562590, heartbeat_interval=45,6                 barrage_host="119.96.201.28",7                 barrage_port=8601):8        self.room_id = room_id9        self.heartbeat_interval = heartbeat_interval10        self.barrage_host = barrage_host11        self.barrage_port = barrage_port12        self.tcp_socket = TCPSocket(self.barrage_host, self.barrage_port)13        self.message_worker = MessageWorker(self.tcp_socket, self.room_id)14        self.heartbeat_worker = HeartbeatWorker(self.tcp_socket, self.heartbeat_interval)15    def add_handler(self, msg_type, handler):16        self.message_worker.add_handler(msg_type, handler)17    def set_heartbeat_interval(self, heartbeat_interval):18        self.heartbeat_interval = heartbeat_interval19    def refresh_object(self):20        self.tcp_socket = TCPSocket(self.barrage_host, self.barrage_port)21        self.message_worker = MessageWorker(self.tcp_socket, self.room_id)22        self.heartbeat_worker = HeartbeatWorker(self.tcp_socket, self.heartbeat_interval)23    def set_room_id(self, room_id):24        self.room_id = room_id25    def start(self):26        self.tcp_socket.connect()27        self.message_worker.start()28        self.heartbeat_worker.start()29    def stop(self):30        self.message_worker.set_stop()31        self.heartbeat_worker.set_stop()32        self.tcp_socket.close()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
