Best Python code snippet using lemoncheesecake
broker.py
Source:broker.py  
...41EVENT_BROKER_CLIENT_DISCONNECTED = 'broker_client_disconnected'42EVENT_BROKER_CLIENT_SUBSCRIBED = 'broker_client_subscribed'43EVENT_BROKER_CLIENT_UNSUBSCRIBED = 'broker_client_unsubscribed'44EVENT_BROKER_MESSAGE_RECEIVED = 'broker_message_received'45def make_log(string):46    try:47        f=open("BrokerStatus.txt", "a+")48    except IOError:49        f = open("BrokerStatus.txt", "w")50    ts= time.time()51    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')52    data_raw= "["+st+"]  "+ string +'\n'53    f.write(data_raw)54    f.flush()55class BrokerException(BaseException):56    pass57class RetainedApplicationMessage:58    def __init__(self, source_session, topic, data, qos=None):59        self.source_session = source_session60        self.topic = topic61        self.data = data62        self.qos = qos63class Server:64    def __init__(self, listener_name, server_instance, max_connections=-1, loop=None):65        self.logger = logging.getLogger(__name__)66        self.instance = server_instance67        self.conn_count = 068        self.listener_name = listener_name69        if loop is not None:70            self._loop = loop71        else:72            self._loop = asyncio.get_event_loop()73        self.max_connections = max_connections74        if self.max_connections > 0:75            self.semaphore = asyncio.Semaphore(self.max_connections, loop=self._loop)76        else:77            self.semaphore = None78    @asyncio.coroutine79    def acquire_connection(self):80        if self.semaphore:81            yield from self.semaphore.acquire()82        self.conn_count += 183        if self.max_connections > 0:84            make_log("Listener '%s': %d/%d connections acquired" %85                              (self.listener_name, self.conn_count, self.max_connections))86        else:87            make_log("Listener '%s': %d connections acquired" %88                              (self.listener_name, self.conn_count))89    def release_connection(self):90        if self.semaphore:91            self.semaphore.release()92        self.conn_count -= 193        if self.max_connections > 0:94            make_log("Listener '%s': %d/%d connections acquired" %95                              (self.listener_name, self.conn_count, self.max_connections))96        else:97            make_log("Listener '%s': %d connections acquired" %98                              (self.listener_name, self.conn_count))99    @asyncio.coroutine100    def close_instance(self):101        if self.instance:102            self.instance.close()103            yield from self.instance.wait_closed()104class BrokerContext(BaseContext):105    """106    BrokerContext is used as the context passed to plugins interacting with the broker.107    It act as an adapter to broker services from plugins developed for HBMQTT broker108    """109    def __init__(self, broker):110        super().__init__()111        self.config = None112        self._broker_instance = broker113    @asyncio.coroutine114    def broadcast_message(self, topic, data, qos=None):115        yield from self._broker_instance.internal_message_broadcast(topic, data, qos)116    def retain_message(self, topic_name, data, qos=None):117        self._broker_instance.retain_message(None, topic_name, data, qos)118    @property119    def sessions(self):120        for k, session in self._broker_instance._sessions.items():121            yield session[0]122    @property123    def retained_messages(self):124        return self._broker_instance._retained_messages125    @property126    def subscriptions(self):127        return self._broker_instance._subscriptions128class Broker:129    """130    MQTT 3.1.1 compliant broker implementation131    :param config: Example Yaml config132    :param loop: asyncio loop to use. Defaults to ``asyncio.get_event_loop()`` if none is given133    :param plugin_namespace: Plugin namespace to use when loading plugin entry_points. Defaults to ``hbmqtt.broker.plugins``134    """135    states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']136    def __init__(self, config=None, loop=None, plugin_namespace=None):137        self.logger = logging.getLogger(__name__)138        self.config = _defaults139        if config is not None:140            self.config.update(config)141        self._build_listeners_config(self.config)142        if loop is not None:143            self._loop = loop144        else:145            self._loop = asyncio.get_event_loop()146        self._servers = dict()147        self._init_states()148        self._sessions = dict()149        self._subscriptions = dict()150        self._retained_messages = dict()151        self._broadcast_queue = asyncio.Queue(loop=self._loop)152        self._broadcast_task = None153        self.ClientID = ""154        # Init plugins manager155        context = BrokerContext(self)156        context.config = self.config157        if plugin_namespace:158            namespace = plugin_namespace159        else:160            namespace = 'hbmqtt.broker.plugins'161        self.plugins_manager = PluginManager(namespace, context, self._loop)162    def _build_listeners_config(self, broker_config):163        self.listeners_config = dict()164        try:165            listeners_config = broker_config['listeners']166            defaults = listeners_config['default']167            for listener in listeners_config:168                config = dict(defaults)169                config.update(listeners_config[listener])170                self.listeners_config[listener] = config171        except KeyError as ke:172            raise BrokerException("Listener config not found invalid: %s" % ke)173    def _init_states(self):174        self.transitions = Machine(states=Broker.states, initial='new')175        self.transitions.add_transition(trigger='start', source='new', dest='starting')176        self.transitions.add_transition(trigger='starting_fail', source='starting', dest='not_started')177        self.transitions.add_transition(trigger='starting_success', source='starting', dest='started')178        self.transitions.add_transition(trigger='shutdown', source='started', dest='stopping')179        self.transitions.add_transition(trigger='stopping_success', source='stopping', dest='stopped')180        self.transitions.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')181        self.transitions.add_transition(trigger='start', source='stopped', dest='starting')182    @asyncio.coroutine183    def start(self):184        """185            Start the broker to serve with the given configuration186            Start method opens network sockets and will start listening for incoming connections.187            This method is a *coroutine*.188        """189        try:190            self._sessions = dict()191            self._subscriptions = dict()192            self._retained_messages = dict()193            self.transitions.start()194            make_log("Broker starting")195        except MachineError as me:196            make_log("[WARN-0001] Invalid method call at this moment: %s" % me)197            raise BrokerException("Broker instance can't be started: %s" % me)198        yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_START)199        try:200            # Start network listeners201            for listener_name in self.listeners_config:202                listener = self.listeners_config[listener_name]203                if 'bind' not in listener:204                    make_log("Listener configuration '%s' is not bound" % listener_name)205                else:206                    # Max connections207                    try:208                        max_connections = listener['max_connections']209                    except KeyError:210                        max_connections = -1211                    # SSL Context212                    sc = None213                    # accept string "on" / "off" or boolean214                    ssl_active = listener.get('ssl', False)215                    if isinstance(ssl_active, str):216                        ssl_active = ssl_active.upper() == 'ON'217                    if ssl_active:218                        try:219                            sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)220                            sc.load_cert_chain(listener['certfile'], listener['keyfile'])221                            sc.verify_mode = ssl.CERT_OPTIONAL222                        except KeyError as ke:223                            raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)224                        except FileNotFoundError as fnfe:225                            raise BrokerException("Can't read cert files '%s' or '%s' : %s" %226                                                  (listener['certfile'], listener['keyfile'], fnfe))227                    address, s_port = listener['bind'].split(':')228                    port = 0229                    try:230                        port = int(s_port)231                    except ValueError as ve:232                        raise BrokerException("Invalid port value in bind value: %s" % listener['bind'])233                    if listener['type'] == 'tcp':234                        cb_partial = partial(self.stream_connected, listener_name=listener_name)235                        instance = yield from asyncio.start_server(cb_partial,236                                                                   address,237                                                                   port,238                                                                   ssl=sc,239                                                                   loop=self._loop)240                        self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)241                    elif listener['type'] == 'ws':242                        cb_partial = partial(self.ws_connected, listener_name=listener_name)243                        instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop,244                                                               subprotocols=['mqtt'])245                        self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)246                    make_log("Listener '%s' bind to %s (max_connections=%d)" %247                                     (listener_name, listener['bind'], max_connections))248            self.transitions.starting_success()249            yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)250            #Start broadcast loop251            self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop)252            make_log("Broker started")253        except Exception as e:254            make_log("Broker startup failed: %s" % e)255            self.transitions.starting_fail()256            raise BrokerException("Broker instance can't be started: %s" % e)257    @asyncio.coroutine258    def shutdown(self):259        """260            Stop broker instance.261            Closes all connected session, stop listening on network socket and free resources.262        """263        try:264            self._sessions = dict()265            self._subscriptions = dict()266            self._retained_messages = dict()267            self.transitions.shutdown()268        except MachineError as me:269            make_log("Invalid method call at this moment: %s" % me)270            raise BrokerException("Broker instance can't be stopped: %s" % me)271        # Fire broker_shutdown event to plugins272        yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN)273        # Stop broadcast loop274        if self._broadcast_task:275            self._broadcast_task.cancel()276        if self._broadcast_queue.qsize() > 0:277            make_log("%d messages not broadcasted" % self._broadcast_queue.qsize())278        for listener_name in self._servers:279            server = self._servers[listener_name]280            yield from server.close_instance()281        make_log("Broker closing")282        make_log("Broker closed")283        yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN)284        self.transitions.stopping_success()285    @asyncio.coroutine286    def internal_message_broadcast(self, topic, data, qos=None):287        return (yield from self._broadcast_message(None, topic, data))288    @asyncio.coroutine289    def ws_connected(self, websocket, uri, listener_name):290        yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket))291    @asyncio.coroutine292    def stream_connected(self, reader, writer, listener_name):293        yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))294    @asyncio.coroutine295    def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterAdapter):296        # Wait for connection available on listener297        server = self._servers.get(listener_name, None)298        if not server:299            raise BrokerException("Invalid listener name '%s'" % listener_name)300        yield from server.acquire_connection()301        remote_address, remote_port = writer.get_peer_info()302        make_log("Connection from %s:%d on listener '%s'" % (remote_address, remote_port, listener_name))303        # Wait for first packet and expect a CONNECT304        try:305            handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)306        except HBMQTTException as exc:307            make_log("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %308                             (format_client_message(address=remote_address, port=remote_port), exc))309            #yield from writer.close()310            make_log("Connection closed")311            return312        except MQTTException as me:313            make_log('Invalid connection from %s : %s' %314                              (format_client_message(address=remote_address, port=remote_port), me))315            yield from writer.close()316            make_log("Connection closed")317            return318        ############################### Checking Read & Write Permissions ##############################319        #self.ClientID = client_session.client_id320        #if self.ClientID[0]=="P":321            #with open("mqtt/write.txt", "r") as f:322                #arr = f.read().splitlines()323            #if self.ClientID[2:] not in arr:324                #return325        #if self.ClientID[0]=="S":326            #with open("mqtt/read.txt", "r") as f:327                #arr = f.read().splitlines()328            #if self.ClientID[2:] not in arr:329                #return330        ################################################################################################331        if client_session.clean_session:332            # Delete existing session and create a new one333            if client_session.client_id is not None:334                self.delete_session(client_session.client_id)335            else:336                client_session.client_id = gen_client_id()337            client_session.parent = 0338        else:339            # Get session from cache340            if client_session.client_id in self._sessions:341                make_log("Found old session %s" % repr(self._sessions[client_session.client_id]))342                (client_session,h) = self._sessions[client_session.client_id]343                client_session.parent = 1344            else:345                client_session.parent = 0346        if client_session.keep_alive > 0:347            client_session.keep_alive += self.config['timeout-disconnect-delay']348        make_log("Keep-alive timeout=%d" % client_session.keep_alive)349        handler.attach(client_session, reader, writer)350        self._sessions[client_session.client_id] = (client_session, handler)351        authenticated = yield from self.authenticate(client_session, self.listeners_config[listener_name])352        if not authenticated:353            yield from writer.close()354            return355        while True:356            try:357                client_session.transitions.connect()358                break359            except MachineError:360                make_log("Client %s is reconnecting too quickly, make it wait" % client_session.client_id)361                # Wait a bit may be client is reconnecting too fast362                yield from asyncio.sleep(1, loop=self._loop)363        yield from handler.mqtt_connack_authorize(authenticated)364        yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id)365        make_log("%s Start messages handling" % client_session.client_id)366        yield from handler.start()367        make_log("Retained messages queue size: %d" % client_session.retained_messages.qsize())368        yield from self.publish_session_retained_messages(client_session)369        # Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)370        disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop)371        subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop)372        unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)373        wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)374        connected = True375        while connected:376            try:377                done, pending = yield from asyncio.wait(378                    [disconnect_waiter, subscribe_waiter, unsubscribe_waiter, wait_deliver],379                    return_when=asyncio.FIRST_COMPLETED, loop=self._loop)380                if disconnect_waiter in done:381                    result = disconnect_waiter.result()382                    make_log("%s Result from wait_diconnect: %s" % (client_session.client_id, result))383                    if result is None:384                        make_log("Will flag: %s" % client_session.will_flag)385                        # Connection closed anormally, send will message386                        if client_session.will_flag:387                            make_log("Client %s disconnected abnormally, sending will message" %388                                              format_client_message(client_session))389                            yield from self._broadcast_message(390                                client_session,391                                client_session.will_topic,392                                client_session.will_message,393                                client_session.will_qos)394                            if client_session.will_retain:395                                self.retain_message(client_session,396                                                    client_session.will_topic,397                                                    client_session.will_message,398                                                    client_session.will_qos)399                    make_log("%s Disconnecting session" % client_session.client_id)400                    yield from self._stop_handler(handler)401                    client_session.transitions.disconnect()402                    yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id)403                    connected = False404                if unsubscribe_waiter in done:405                    make_log("%s handling unsubscription" % client_session.client_id)406                    unsubscription = unsubscribe_waiter.result()407                    for topic in unsubscription['topics']:408                        self._del_subscription(topic, client_session)409                        yield from self.plugins_manager.fire_event(410                            EVENT_BROKER_CLIENT_UNSUBSCRIBED,411                            client_id=client_session.client_id,412                            topic=topic)413                    yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])414                    unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)415                if subscribe_waiter in done:416                    make_log("%s handling subscription" % client_session.client_id)417                    subscriptions = subscribe_waiter.result()418                    return_codes = []419                    for subscription in subscriptions['topics']:420                        return_codes.append(self.add_subscription(subscription, client_session))421                    yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)422                    for index, subscription in enumerate(subscriptions['topics']):423                        if return_codes[index] != 0x80:424                            yield from self.plugins_manager.fire_event(425                                EVENT_BROKER_CLIENT_SUBSCRIBED,426                                client_id=client_session.client_id,427                                topic=subscription[0],428                                qos=subscription[1])429                            yield from self.publish_retained_messages_for_subscription(subscription, client_session)430                    subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)431                    make_log(repr(self._subscriptions))432                if wait_deliver in done:433                    if self.logger.isEnabledFor(logging.DEBUG):434                        make_log("%s handling message delivery" % client_session.client_id)435                    app_message = wait_deliver.result()436                    if not app_message.topic:437                        self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)438                        break439                    if "#" in app_message.topic or "+" in app_message.topic:440                        self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)441                        break442                    yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,443                                                               client_id=client_session.client_id,444                                                               message=app_message)445                    yield from self._broadcast_message(client_session, app_message.topic, app_message.data)446                    if app_message.publish_packet.retain_flag:447                        self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)448                    wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)449            except asyncio.CancelledError:450                make_log("Client loop cancelled")451                break452        disconnect_waiter.cancel()453        subscribe_waiter.cancel()454        unsubscribe_waiter.cancel()455        wait_deliver.cancel()456        make_log("%s Client disconnected" % client_session.client_id)457        server.release_connection()458    def _init_handler(self, session, reader, writer):459        """460        Create a BrokerProtocolHandler and attach to a session461        :return:462        """463        handler = BrokerProtocolHandler(self.plugins_manager, self._loop)464        handler.attach(session, reader, writer)465        return handler466    @asyncio.coroutine467    def _stop_handler(self, handler):468        """469        Stop a running handler and detach if from the session470        :param handler:471        :return:472        """473        try:474            yield from handler.stop()475        except Exception as e:476            self.logger.error(e)477    @asyncio.coroutine478    def authenticate(self, session: Session, listener):479        """480        This method call the authenticate method on registered plugins to test user authentication.481        User is considered authenticated if all plugins called returns True.482        Plugins authenticate() method are supposed to return :483         - True if user is authentication succeed484         - False if user authentication fails485         - None if authentication can't be achieved (then plugin result is then ignored)486        :param session:487        :param listener:488        :return:489        """490        auth_plugins = None491        auth_config = self.config.get('auth', None)492        if auth_config:493            auth_plugins = auth_config.get('plugins', None)494        returns = yield from self.plugins_manager.map_plugin_coro(495            "authenticate",496            session=session,497            filter_plugins=auth_plugins)498        auth_result = True499        if returns:500            for plugin in returns:501                res = returns[plugin]502                if res is False:503                    auth_result = False504                    make_log("Authentication failed due to '%s' plugin result: %s" % (plugin.name, res))505                else:506                    make_log("'%s' plugin result: %s" % (plugin.name, res))507        # If all plugins returned True, authentication is success508        return auth_result509    def retain_message(self, source_session, topic_name, data, qos=None):510        if data is not None and data != b'':511            # If retained flag set, store the message for further subscriptions512            make_log("Retaining message on topic %s" % topic_name)513            retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos)514            self._retained_messages[topic_name] = retained_message515        else:516            # [MQTT-3.3.1-10]517            if topic_name in self._retained_messages:518                make_log("Clear retained messages for topic '%s'" % topic_name)519                del self._retained_messages[topic_name]520    def add_subscription(self, subscription, session):521        import re522        wildcard_pattern = re.compile('.*?/?\+/?.*?')523        try:524            a_filter = subscription[0]525            if '#' in a_filter and not a_filter.endswith('#'):526                # [MQTT-4.7.1-2] Wildcard character '#' is only allowed as last character in filter527                return 0x80528            if a_filter != "+":529                if '+' in a_filter:530                    if "/+" not in a_filter and "+/" not in a_filter:531                        # [MQTT-4.7.1-3] + wildcard character must occupy entire level532                        return 0x80533            qos = subscription[1]534            if 'max-qos' in self.config and qos > self.config['max-qos']:535                qos = self.config['max-qos']536            if a_filter not in self._subscriptions:537                self._subscriptions[a_filter] = []538            already_subscribed = next(539                (s for (s,qos) in self._subscriptions[a_filter] if s.client_id == session.client_id), None)540            if not already_subscribed:541                self._subscriptions[a_filter].append((session, qos))542            else:543                make_log("Client %s has already subscribed to %s" % (format_client_message(session=session), a_filter))544            return qos545        except KeyError:546            return 0x80547    def _del_subscription(self, a_filter, session):548        """549        Delete a session subscription on a given topic550        :param a_filter:551        :param session:552        :return:553        """554        deleted = 0555        try:556            subscriptions = self._subscriptions[a_filter]557            for index, (sub_session, qos) in enumerate(subscriptions):558                if sub_session.client_id == session.client_id:559                    make_log("Removing subscription on topic '%s' for client %s" %560                                      (a_filter, format_client_message(session=session)))561                    subscriptions.pop(index)562                    deleted += 1563                    break564        except KeyError:565            # Unsubscribe topic not found in current subscribed topics566            pass567        finally:568            return deleted569    def _del_all_subscriptions(self, session):570        """571        Delete all topic subscriptions for a given session572        :param session:573        :return:574        """575        filter_queue = deque()576        for topic in self._subscriptions:577            if self._del_subscription(topic, session):578                filter_queue.append(topic)579        for topic in filter_queue:580            if not self._subscriptions[topic]:581                del self._subscriptions[topic]582    def matches(self, topic, a_filter):583        if "#" not in a_filter and "+" not in a_filter:584            # if filter doesn't contain wildcard, return exact match585            return a_filter == topic586        else:587            # else use regex588            match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+'))589            return match_pattern.match(topic)590    @asyncio.coroutine591    def _broadcast_loop(self):592        running_tasks = deque()593        try:594            while True:595                while running_tasks and running_tasks[0].done():596                    running_tasks.popleft()597                broadcast = yield from self._broadcast_queue.get()598                if self.logger.isEnabledFor(logging.DEBUG):599                    make_log("broadcasting %r" % broadcast)600                for k_filter in self._subscriptions:601                    if broadcast['topic'].startswith("$") and (k_filter.startswith("+") or k_filter.startswith("#")):602                        make_log("[MQTT-4.7.2-1] - ignoring brodcasting $ topic to subscriptions starting with + or #")603                    elif self.matches(broadcast['topic'], k_filter):604                        subscriptions = self._subscriptions[k_filter]605                        for (target_session, qos) in subscriptions:606                            if 'qos' in broadcast:607                                qos = broadcast['qos']608                            if target_session.transitions.state == 'connected':609                                make_log("broadcasting application message from %s on topic '%s' to %s" %610                                                  (format_client_message(session=broadcast['session']),611                                                   broadcast['topic'], format_client_message(session=target_session)))612                                handler = self._get_handler(target_session)613                                task = ensure_future(614                                    handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),615                                    loop=self._loop)616                                running_tasks.append(task)617                            else:618                                make_log("retaining application message from %s on topic '%s' to client '%s'" %619                                                  (format_client_message(session=broadcast['session']),620                                                   broadcast['topic'], format_client_message(session=target_session)))621                                retained_message = RetainedApplicationMessage(622                                    broadcast['session'], broadcast['topic'], broadcast['data'], qos)623                                yield from target_session.retained_messages.put(retained_message)624        except CancelledError:625            # Wait until current broadcasting tasks end626            if running_tasks:627                yield from asyncio.wait(running_tasks, loop=self._loop)628    @asyncio.coroutine629    def _broadcast_message(self, session, topic, data, force_qos=None):630        broadcast = {631            'session': session,632            'topic': topic,633            'data': data634        }635        #print(topic)636        if force_qos:637            broadcast['qos'] = force_qos638        try:639            f=open(topic + ".txt", "a+")640            ts = time.time()641            st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')642            data = data.decode('utf-8')643            #print (type(data))w644            data_raw = "["+st+"]  " + data + '\n'645            #print(data_raw)646            f.write(str(data_raw))647            f.flush()648            f.close()649        except:650                try:651                    f = open(topic + ".txt", "w")652                    ts = time.time()653                    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')654                    data = data.decode('utf-8')655                    #print (type(data))656                    data_raw = "["+st+"]  " + data + '\n'657                    #print(data_raw)658                    f.write(data_raw)659                    f.flush()660                    f.close()661                except:662                    pass663        yield from self._broadcast_queue.put(broadcast)664    @asyncio.coroutine665    def publish_session_retained_messages(self, session):666        make_log("Publishing %d messages retained for session %s" %667                          (session.retained_messages.qsize(), format_client_message(session=session))668                          )669        publish_tasks = []670        handler = self._get_handler(session)671        while not session.retained_messages.empty():672            retained = yield from session.retained_messages.get()673            publish_tasks.append(ensure_future(674                handler.mqtt_publish(675                    retained.topic, retained.data, retained.qos, True), loop=self._loop))676        if publish_tasks:677            yield from asyncio.wait(publish_tasks, loop=self._loop)678    @asyncio.coroutine679    def publish_retained_messages_for_subscription(self, subscription, session):680        make_log("Begin broadcasting messages retained due to subscription on '%s' from %s" %681                          (subscription[0], format_client_message(session=session)))682        publish_tasks = []683        handler = self._get_handler(session)684        for d_topic in self._retained_messages:685            make_log("matching : %s %s" % (d_topic, subscription[0]))686            if self.matches(d_topic, subscription[0]):687                make_log("%s and %s match" % (d_topic, subscription[0]))688                retained = self._retained_messages[d_topic]689                publish_tasks.append(asyncio.Task(690                    handler.mqtt_publish(691                        retained.topic, retained.data, subscription[1], True), loop=self._loop))692        if publish_tasks:693            yield from asyncio.wait(publish_tasks, loop=self._loop)694        make_log("End broadcasting messages retained due to subscription on '%s' from %s" %695                          (subscription[0], format_client_message(session=session)))696    def delete_session(self, client_id):697        """698        Delete an existing session data, for example due to clean session set in CONNECT699        :param client_id:700        :return:701        """702        try:703            session = self._sessions[client_id][0]704        except KeyError:705            session = None706        if session is None:707            make_log("Delete session : session %s doesn't exist" % client_id)708            return709        # Delete subscriptions710        make_log("deleting session %s subscriptions" % repr(session))711        self._del_all_subscriptions(session)712        make_log("deleting existing session %s" % repr(self._sessions[client_id]))713        del self._sessions[client_id]714    def _get_handler(self, session):715        client_id = session.client_id716        if client_id:717            try:718                return self._sessions[client_id][1]719            except KeyError:720                pass...table_test.py
Source:table_test.py  
...31    '%(levelname)s'32    '\x1b[0m'33    ' '34    '%(message)s', _TIMESTAMP_FORMAT)35def make_log(**kwargs):36    """Create a LogLine instance."""37    # Construct a LogRecord38    attributes = dict(name='testlogger',39                      levelno=logging.INFO,40                      levelname='INF',41                      msg='[%s] %.3f %s',42                      args=('MOD1', 3.14159, 'Real message here'),43                      created=_TIMESTAMP_SAMPLE.timestamp(),44                      filename='test.py',45                      lineno=42,46                      pathname='/home/user/test.py')47    # Override any above attrs that are passed in.48    attributes.update(kwargs)49    # Create the record50    record = logging.makeLogRecord(dict(attributes))51    # Format52    formatted_message = formatter.format(record)53    log_line = LogLine(record=record,54                       formatted_log=formatted_message,55                       ansi_stripped_log='')56    log_line.update_metadata()57    return log_line58class TestTableView(unittest.TestCase):59    """Tests for rendering log lines into tables."""60    def setUp(self):61        # Show large diffs62        self.maxDiff = None  # pylint: disable=invalid-name63    @parameterized.expand([64        (65            'Correct column widths with all fields set',66            [67                make_log(68                    args=('M1', 1.2345, 'Something happened'),69                    extra_metadata_fields=dict(module='M1', time=12)),70                make_log(71                    args=('MD2', 567.5, 'Another cool event'),72                    extra_metadata_fields=dict(module='MD2', time=123)),73            ],74            dict(module=len('MD2'), time=len('123')),75        ),76        (77            'Missing metadata fields on some rows',78            [79                make_log(80                    args=('M1', 54321.2, 'Something happened'),81                    extra_metadata_fields=dict(module='M1', time=54321.2)),82                make_log(83                    args=('MOD2', 567.5, 'Another cool event'),84                    extra_metadata_fields=dict(module='MOD2')),85            ],86            dict(module=len('MOD2'), time=len('54321.200')),87        ),88    ]) # yapf: disable89    def test_column_widths(self, _name, logs, expected_widths) -> None:90        """Test colum widths calculation."""91        table = TableView()92        for log in logs:93            table.update_metadata_column_widths(log)94            # update_metadata_column_widths shoulp populate self.metadata.fields95            self.assertEqual(log.metadata.fields,96                             log.record.extra_metadata_fields)97        # Check expected column widths98        self.assertEqual(dict(table.column_widths), expected_widths)99    @parameterized.expand([100        (101            'Build header adding fields incrementally',102            [103                make_log(104                    args=('MODULE2', 567.5, 'Another cool event'),105                    extra_metadata_fields=dict(106                        # timestamp missing107                        module='MODULE2')),108                make_log(109                    args=('MODULE1', 54321.2, 'Something happened'),110                    extra_metadata_fields=dict(111                        # timestamp added in112                        module='MODULE1', timestamp=54321.2)),113            ],114            [115                [('bold', 'Time             '), _TABLE_PADDING,116                 ('bold', 'Lvl'), _TABLE_PADDING,117                 ('bold', 'Module   '),118                 ('bold', 'Message')],119                [('bold', 'Time             '), _TABLE_PADDING,120                 ('bold', 'Lvl'), _TABLE_PADDING,121                 # timestamp added in122                 ('bold', 'Timestamp  '),123                 ('bold', 'Module   '),124                 ('bold', 'Message')],125            ],126        ),127    ]) # yapf: disable128    def test_formatted_header(self, _name, logs, expected_headers) -> None:129        """Test colum widths calculation."""130        table = TableView()131        for log, header in zip(logs, expected_headers):132            table.update_metadata_column_widths(log)133            self.assertEqual(table.formatted_header(), header)134    @parameterized.expand([135        (136            'Build rows adding fields incrementally',137            [138                make_log(139                    args=('MODULE2', 567.5, 'Another cool event'),140                    extra_metadata_fields=dict(141                        # timestamp missing142                        module='MODULE2',143                        msg='Another cool event')),144                make_log(145                    args=('MODULE2', 567.5, 'Another cool event'),146                    extra_metadata_fields=dict(147                        # timestamp and msg missing148                        module='MODULE2')),149                make_log(150                    args=('MODULE1', 54321.2, 'Something happened'),151                    extra_metadata_fields=dict(152                        # timestamp added in153                        module='MODULE1', timestamp=54321.2,154                        msg='Something happened')),155            ],156            [157                [158                    ('class:log-time', _TIMESTAMP_SAMPLE_STRING),159                    _TABLE_PADDING,160                    ('class:log-level-20', 'INF'),161                    _TABLE_PADDING,162                    ('class:log-table-column-3', 'MODULE2  '),163                    ('', 'Another cool event'),...kadai02_URL_mynav.py
Source:kadai02_URL_mynav.py  
...35    else:36        driver = webdriver.Firefox(executable_path=GeckoDriverManager().install())37    return driver38#logåºå颿°39def make_log(txt):40    now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')41    logStr = '[%s:%s] %s' % ('log', now, txt)42    #ãã¡ã¤ã«ã«åºå43    with open(log_file_path, 'a', encoding='utf-8_sig') as f:44        f.write(logStr + '\n')45    print(logStr)46#URLã«ä»å ããæ¤ç´¢ãã¼ã¯ã¼ããçæãã颿°47def make_paramaters(words):48    word_list = words.split()49    num = len(word_list)50    count = 151    words_search = ''52    for word in word_list:53      if num == count:54        words_search = words_search + f'kw{word}'55      else:56        words_search = words_search + f'kw{word}_'57      count += 158    return words_search59def find_table_target_word(table_ele, word):60    th_list = table_ele.find_elements_by_css_selector("th")61    td_list = table_ele.find_elements_by_css_selector("td")62    target = ''63    for th, td in zip(th_list, td_list):64        if word == th.text:65            target=td.text66            break67        68    return target69# mainå¦ç70def main():71   # driverãèµ·å72    driver = set_driver(BROWSER,False)73    make_log('ãã©ã¦ã¶èµ·å')74    key_word_search = input('æ¤ç´¢ããããã¼ã¯ã¼ããå
¥åãã¦ãã ããã>>')  75  # Webãµã¤ããéã76    driver.get(TARGET_SITE)77    time.sleep(4)78  # ãããã¢ãããéãã79    driver.execute_script('document.querySelector(".karte-close").click()')80    time.sleep(4)81  # ãããã¢ãããéãã82    driver.execute_script('document.querySelector(".karte-close").click()')83  #æ¤ç´¢ãã¼ã¯ã¼ãããã©ã¡ã¼ã¿ã¨ãã¦URLãçæãã¦æ¤ç´¢ãã84    paramaters_search = make_paramaters(key_word_search)85    driver.get(TARGET_SITE + 'list/' + paramaters_search)86    make_log(f'æ¤ç´¢ãã¼ã¯ã¼ã({key_word_search})ãä»å ãURLãçæ')87  # æ¤ç´¢çµæ ä»¶æ°ãåå¾ã表示88    total_names = driver.find_element_by_css_selector(89      "span.js__searchRecruit--count").text90    # print(f'ãã¼ã¯ã¼ãã«è©²å½ãã伿¥æ°={total_names}社')91    make_log(f'æ¤ç´¢çµæå徿å:ãã¼ã¯ã¼ãã«è©²å½ãã伿¥æ°={total_names}社')92    page = 193    counts_companies = 194    df = pd.DataFrame()95    while True:96        contents = driver.find_elements_by_css_selector('.cassetteRecruit')97        # print(f'{page}ãã¼ã¸ç®ã®ä¼æ¥æ°={len(contents)}')98        make_log(f'{page}ãã¼ã¸ç®ã®ä¼æ¥æ°={len(contents)}')99        for content in contents:100            try:101                name_catch =content.find_element_by_css_selector("h3").text102                name_catch = name_catch.strip().split('|')103                104                if(len(name_catch) > 1):105                    name = name_catch[0]106                    company_catch = name_catch[1]107                else:108                    name = name_catch[0]109                    company_catch = ''110                title = content.find_element_by_css_selector(111                ".cassetteRecruit__copy > a").text112                link = content.find_element_by_css_selector(113                ".cassetteRecruit__copy > a").get_attribute("href")114                update_date = content.find_element_by_css_selector(115                ".cassetteRecruit__updateDate > span").text116                table = content.find_element_by_css_selector(".tableCondition")117                salary1styear = find_table_target_word(table,"å年度年å")118                df = df.append({119                    'No.': counts_companies,120                    '伿¥å': name,121                    'åéå
容': title,122                    'æ
å ±æ´æ°æ¥': update_date,123                    'åéå
容詳細': link,124                    '伿¥ç´¹ä»': company_catch,125                    'å年度年å': salary1styear126                }, ignore_index=True)127                make_log(f'{counts_companies}ç¤¾ç® æ
å ±å徿å')128            except Exception as e:129                make_log(f'{counts_companies}ç¤¾ç® æ
å ±åå¾å¤±æ')130                make_log(traceback.format_exc())131            finally:132                counts_companies += 1133        #次ãã¼ã¸ã¸é·ç§»134        element_click = driver.find_elements_by_css_selector(135            ".iconFont--arrowLeft")136        if len(element_click) >= 1:137            page += 1138            driver.execute_script(139                "arguments[0].scrollIntoView(true);", element_click[0])140            element_click[0].click()141            make_log(f'{page}ãã¼ã¸ç®ã¸é·ç§»')142        else:143            print("æ¤ç´¢çµæãå
¨ã¦åå¾ãã¾ããã")144            driver.close()145            break146    147    # çµæãcsvãã¡ã¤ã«ã«ä¿å148    today = datetime.datetime.now().strftime('%Y-%m-%d')149    file_name_path = './results/' + \150        f'æ¤ç´¢çµæ(ãã¼ã¯ã¼ã={key_word_search})_{today}'+'.csv'151    df.to_csv(file_name_path, encoding="utf-8_sig", index=False)152    make_log(f'ãã¡ã¤ã«ã«ä¿å_ãã¡ã¤ã«åãæ¤ç´¢çµæ(ãã¼ã¯ã¼ã={key_word_search}).csvã')153    154# ç´æ¥èµ·åãããå ´åã¯main()ãèµ·å(ã¢ã¸ã¥ã¼ã«ã¨ãã¦å¼ã³åºãããå ´åã¯èµ·åããªãããã«ãããã)155if __name__ == "__main__":...kadai02_mynav.py
Source:kadai02_mynav.py  
...34    else:35        driver = webdriver.Firefox(executable_path=GeckoDriverManager().install())36    return driver37#logåºå颿°38def make_log(txt):39    now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')40    logStr = '[%s:%s] %s' %('log', now, txt)41    #ãã¡ã¤ã«ã«åºå42    with open(log_file_path, 'a', encoding='utf-8_sig') as f:43        f.write(logStr + '\n')44    print(logStr)45    # mainå¦ç46def main():47   # driverãèµ·å48    driver = set_driver(BROWSER,False)49    make_log('ãã©ã¦ã¶èµ·å')50    key_word_search = input('æ¤ç´¢ããããã¼ã¯ã¼ããå
¥åãã¦ãã ããã>>')  51  # Webãµã¤ããéã52    driver.get(TARGET_SITE)53    time.sleep(4)54  # ãããã¢ãããéãã55    driver.execute_script('document.querySelector(".karte-close").click()')56    time.sleep(4)57  # ãããã¢ãããéãã58    driver.execute_script('document.querySelector(".karte-close").click()')59  #ãã¼ã¯ã¼ããæ¤ç´¢çªã«å
¥åãã60    driver.find_element_by_css_selector(61      "input.topSearch__text").send_keys(key_word_search)62    make_log(f'ãã¼ã¯ã¼ã({key_word_search})ãæ¤ç´¢çªã«å
¥å')63  # æ¤ç´¢ãã¿ã³ã¯ãªãã¯64    driver.find_element_by_class_name("topSearch__button").click()65  # æ¤ç´¢çµæ ä»¶æ°ãåå¾ã表示66    total_names = driver.find_element_by_css_selector(67      "span.js__searchRecruit--count").text68    print(f'ãã¼ã¯ã¼ãã«è©²å½ãã伿¥æ°={total_names}社')69    make_log(f'æ¤ç´¢çµæå徿å:ãã¼ã¯ã¼ãã«è©²å½ãã伿¥æ°={total_names}社')70    page = 171    counts_companies = 172    df = pd.DataFrame()73    while True:74        contents = driver.find_elements_by_css_selector('.cassetteRecruit')75        print(f'{page}ãã¼ã¸ç®ã®ä¼æ¥æ°={len(contents)}')76        make_log(f'{page}ãã¼ã¸ç®ã®ä¼æ¥æ°={len(contents)}')77        for content in contents:78            try:79                #伿¥åã¨ä¼æ¥ãã£ããã³ãã¼ããªã¹ãã«æ ¼ç´âããããã夿°ã«åãåºã80                name_catch =content.find_element_by_css_selector("h3").text81                name_catch = name_catch.strip().split('|')82                if(len(name_catch) > 1):83                    name = name_catch[0]84                    company_catch = name_catch[1]85                else:86                    name = name_catch[0]87                    company_catch=''88                #æ±äººä»¶åã詳細URLãæ´æ°æ¥ãåå¾89                title = content.find_element_by_css_selector(90                    ".cassetteRecruit__copy > a").text91                link = content.find_element_by_css_selector(92                    ".cassetteRecruit__copy > a").get_attribute("href")93                update_date = content.find_element_by_css_selector(94                    ".cassetteRecruit__updateDate > span").text95                # print(counts_companies, name,update_date,link)96                #æ
å ±ããã¼ã¿ãã¬ã¼ã ã«æ ¼ç´97                df = df.append({98                    'No.': counts_companies,99                    '伿¥å': name,100                    'åéå
容': title,101                    'æ
å ±æ´æ°æ¥': update_date,102                    'åéå
容詳細': link,103                    '伿¥ç´¹ä»': company_catch104                }, ignore_index=True)105                make_log(f'{counts_companies}ç¤¾ç® æ
å ±å徿å')106            except Exception as e:107                make_log(f'{counts_companies}ç¤¾ç® æ
å ±åå¾å¤±æ')108                make_log(traceback.format_exc())109            finally:110                counts_companies += 1111        #次ãã¼ã¸ã¸é·ç§»112        element_click = driver.find_elements_by_css_selector(113            ".iconFont--arrowLeft")114        if len(element_click) >=1:115            page += 1116            driver.execute_script(117                "arguments[0].scrollIntoView(true);", element_click[0])118            element_click[0].click()119            make_log(f'{page}ãã¼ã¸ç®ã¸é·ç§»')120        else:121            print("æ¤ç´¢çµæãå
¨ã¦åå¾ãã¾ããã")122            driver.close()123            break124    # çµæãcsvãã¡ã¤ã«ã«ä¿å125    today = datetime.datetime.now().strftime('%Y-%m-%d')126    file_name_path = './results/' + f'æ¤ç´¢çµæ(ãã¼ã¯ã¼ã={key_word_search})_{today}'+'.csv'127    df.to_csv(file_name_path, encoding="utf-8_sig", index=False)128    make_log(f'ãã¡ã¤ã«ã«ä¿å_ãã¡ã¤ã«åãæ¤ç´¢çµæ(ãã¼ã¯ã¼ã={key_word_search}).csvã')129    130# ç´æ¥èµ·åãããå ´åã¯main()ãèµ·å(ã¢ã¸ã¥ã¼ã«ã¨ãã¦å¼ã³åºãããå ´åã¯èµ·åããªãããã«ãããã)131if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
