Best Python code snippet using molotov_python
database.py
Source:database.py  
1from mysql.connector import errorcode2from utilities.utils import *3from utilities.enums import *4import mysql.connector5DATABASE_ERROR = -16class Database:7    def __init__(self, host, username, password, database, port):8        self.host = host9        self.username = username10        self.password = password11        self.db_name = database12        self.port = port13        self.cursor = None14        self.connection = None15    def connect(self):16        try:17            self.connection = mysql.connector.connect(user=self.username,18                                                      password=self.password,19                                                      host=self.host,20                                                      database=self.db_name,21                                                      port=self.port22                                                      )23            return self.connection24        except mysql.connector.Error as err:25            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:26                return DatabaseErrors.ACCESS_DENIED27            elif err.errno == errorcode.ER_BAD_DB_ERROR:28                return DatabaseErrors.DB_EXCEPTION29        except mysql.connector.errors.OperationalError:30            return DatabaseErrors.CONNECTION_LOST31    def get_cursor(self):32        self.cursor = self.connection.cursor()33    def shut_down(self):34        self.cursor.close()35        self.connection.close()36    def get_admin_info(self, email):37        try:38            self.cursor.execute(f"""39                                    SELECT first_name, email, psw 40                                    FROM person JOIN administrator ON person.id = administrator.person_id41                                    WHERE person.email = '{email}'42                                """)43            return self.cursor.fetchone()44        except mysql.connector.errors.OperationalError:45            return DatabaseErrors.CONNECTION_LOST46        except mysql.connector.errors.InterfaceError:47            return DatabaseErrors.CONNECTION_LOST48    def add_product(self, values):49        query = "INSERT INTO product (product_name, price, qnt) VALUES (%s, %s, %s);"50        args = (values[0], f'{values[1]}', values[2])51        # try to execute the query52        try:53            self.cursor.execute(query, args)54        except mysql.connector.errors.IntegrityError:55            return DatabaseErrors.NAME_ALREADY_EXIST56        except mysql.connector.errors.OperationalError:57            return DatabaseErrors.CONNECTION_LOST58        except mysql.connector.errors.DataError:59            return DatabaseErrors.DATA_ERROR60        except mysql.connector.errors.InterfaceError:61            return DatabaseErrors.CONNECTION_LOST62        self.connection.commit()63    def add_person(self, values):64        query = "INSERT INTO person (first_name, last_name, email, psw, money) VALUES (%s, %s, %s, %s, %s);"65        args = (values[0], values[1], values[2], values[3], f'{values[4]}')66        try:67            self.cursor.execute(query, args)68        except mysql.connector.errors.IntegrityError:69            return DatabaseErrors.EMAIL_ALREADY_EXIST70        except mysql.connector.errors.OperationalError:71            return DatabaseErrors.CONNECTION_LOST72        except mysql.connector.errors.DataError:73            return DatabaseErrors.DATA_ERROR74        except mysql.connector.errors.InterfaceError:75            return DatabaseErrors.CONNECTION_LOST76        self.connection.commit()77    def add_customer(self, customer_email):78        try:79            self.cursor.execute(f"SELECT id FROM person WHERE email = '{customer_email}'")80            id_ = self.cursor.fetchone()81            self.cursor.execute(f"INSERT INTO customer (person_id) VALUES ({id_[0]});")82        except mysql.connector.OperationalError:83            return DatabaseErrors.CONNECTION_LOST84        except mysql.connector.errors.InterfaceError:85            return DatabaseErrors.CONNECTION_LOST86        self.connection.commit()87    def get_customers(self, limit):88        try:89            self.cursor.execute(f'''SELECT customer.id, first_name, last_name, email, psw, money90                               FROM customer JOIN person91                               ON customer.person_id = person.id LIMIT {limit}92                          ''')93            return self.cursor.fetchall()94        except mysql.connector.errors.OperationalError:95            return DatabaseErrors.CONNECTION_LOST96        except mysql.connector.errors.InterfaceError:97            return DatabaseErrors.CONNECTION_LOST98    def get_customer_info(self, email):99        try:100            self.cursor.execute(f"""SELECT email, psw, first_name, last_name, person.id, money101                               FROM ecommerce.customer JOIN person ON customer.person_id = person.id102                               WHERE email = '{email}';103                           """)104            return self.cursor.fetchone()105        except mysql.connector.errors.OperationalError:106            return DatabaseErrors.CONNECTION_LOST107        except mysql.connector.errors.InterfaceError:108            return DatabaseErrors.CONNECTION_LOST109    def get_products(self, limit):110        try:111            self.cursor.execute(f"SELECT * FROM product LIMIT {limit}")112            return self.cursor.fetchall()113        except mysql.connector.errors.OperationalError:114            return DatabaseErrors.CONNECTION_LOST115        except mysql.connector.errors.InterfaceError:116            return DatabaseErrors.CONNECTION_LOST117    def get_super_root(self):118        try:119            self.cursor.execute("""SELECT email, psw FROM person120                              JOIN administrator ON administrator.person_id = person.id121                              WHERE administrator.id = 1"""122                                )123            return self.cursor.fetchone()124        except mysql.connector.errors.OperationalError:125            return DatabaseErrors.CONNECTION_LOST126    def get_product_bought(self, prod_name):127        try:128            self.cursor.execute(f"SELECT * FROM product WHERE product.product_name LIKE '{prod_name}';")129            return self.cursor.fetchone()130        except mysql.connector.errors.OperationalError:131            return DatabaseErrors.CONNECTION_LOST132        except mysql.connector.errors.InterfaceError:133            return DatabaseErrors.CONNECTION_LOST134    def get_product_searched(self, prod_name, limit):135        try:136            self.cursor.execute(f"SELECT * FROM product WHERE product.product_name LIKE '%{prod_name}%' LIMIT {limit}")137            return self.cursor.fetchall()138        except mysql.connector.errors.OperationalError:139            return DatabaseErrors.CONNECTION_LOST140    def delete_product(self, id_):141        try:142            self.cursor.execute(f"DELETE FROM product WHERE id = {id_}")143        except mysql.connector.errors.OperationalError:144            return DatabaseErrors.CONNECTION_LOST145        except mysql.connector.errors.InterfaceError:146            return DatabaseErrors.CONNECTION_LOST147        self.connection.commit()148    def rmv_qnt_product(self, prod_name):149        try:150            product = self.get_product_bought(prod_name)151            if product[3] <= 0:  # if the quantity is equal to 0152                return DatabaseErrors.OUT_OF_STOCK153            self.cursor.execute(f"UPDATE product SET qnt = {product[3] - 1} WHERE id = {product[0]};")154        except mysql.connector.errors.InterfaceError:155            return DatabaseErrors.CONNECTION_LOST156        self.connection.commit()157    def delete_customer(self, id_):158        try:159            self.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")160            self.connection.commit()161            self.cursor.execute(f"""SELECT customer.person_id FROM customer162                                       JOIN person ON person.id = customer.person_id WHERE customer.id = {id_};""")163            person_id = self.cursor.fetchone()164            self.cursor.execute(f"DELETE FROM customer WHERE customer.id = {id_};")165            self.cursor.execute(f"""DELETE FROM person WHERE person.id = {person_id[0]};""")166            self.connection.commit()167        except mysql.connector.errors.OperationalError:168            return DatabaseErrors.CONNECTION_LOST169        except mysql.connector.errors.InterfaceError:170            return DatabaseErrors.CONNECTION_LOST171        self.connection.commit()172    def get_customer_searched(self, customer_name, limit):173        try:174            self.cursor.execute(f"""SELECT person.id, first_name, last_name, email, psw, money175                               FROM person JOIN customer ON person.id = customer.person_id176                               WHERE person.first_name LIKE '%{customer_name}%' LIMIT {limit}""")177            return self.cursor.fetchall()178        except mysql.connector.errors.OperationalError:179            return DatabaseErrors.CONNECTION_LOST180        except mysql.connector.errors.InterfaceError:181            return DatabaseErrors.CONNECTION_LOST182    def customer_change_money(self, credit, person_id):183        try:184            self.cursor.execute(f"UPDATE person SET person.money = {credit} WHERE person.id = {person_id}")185        except mysql.connector.errors.OperationalError:186            return False187        except mysql.connector.errors.InterfaceError:188            return DatabaseErrors.CONNECTION_LOST189        self.connection.commit()190        return True191    def create_my_order(self, customer_id):192        try:193            self.cursor.execute(f"INSERT INTO my_order (customer_id) VALUES ({customer_id});")194        except mysql.connector.errors.OperationalError:195            return DatabaseErrors.CONNECTION_LOST196        except mysql.connector.errors.InterfaceError:197            return DatabaseErrors.CONNECTION_LOST198        self.connection.commit()199    def create_prod_ordered(self, prod_id, customer_id):200        try:201            order_id = self.get_last_order(customer_id)202            sql = "INSERT INTO product_ordered(product_id, order_id, date_) VALUES (%s, %s, %s);"203            args = (f'{prod_id}', f'{order_id[0]}', get_date())204            self.cursor.execute(sql, args)205        except mysql.connector.errors.OperationalError:206            return DatabaseErrors.CONNECTION_LOST207        self.connection.commit()208    def get_customer_id(self, person_id):209        try:210            self.cursor.execute(f"""SELECT customer.id FROM customer211                               JOIN person ON person.id = customer.person_id212                               WHERE person.id = {person_id}213                           """)214            return self.cursor.fetchone()215        except mysql.connector.errors.OperationalError:216            return DatabaseErrors.CONNECTION_LOST217        except mysql.connector.errors.InterfaceError:218            return DatabaseErrors.CONNECTION_LOST219    def get_last_order(self, customer_id):220        try:221            self.cursor.execute(222                f"SELECT id FROM my_order WHERE my_order.customer_id = {customer_id} ORDER BY id DESC LIMIT 1")223            return self.cursor.fetchone()224        except mysql.connector.errors.OperationalError:225            return DatabaseErrors.CONNECTION_LOST226    def get_orders(self, customer_id, limit):227        try:228            self.cursor.execute(f"""SELECT person.first_name,229	                                       person.last_name,230	                                       product.product_name,231                                           product_ordered.date_,232	                                       my_order.id, product.id233                                      234                                    FROM my_order235	                                    JOIN customer ON my_order.customer_id = customer.id236                                        JOIN person ON person.id = customer.person_id237	                                    JOIN product_ordered ON product_ordered.order_id = my_order.id238                                        JOIN product ON product.id = product_ordered.product_id239                                    240                                    WHERE customer_id = {customer_id}241                                    LIMIT {limit}242                               """)243            return self.cursor.fetchall()244        except mysql.connector.errors.OperationalError:245            return DatabaseErrors.CONNECTION_LOST246        except mysql.connector.errors.InterfaceError:247            return DatabaseErrors.CONNECTION_LOST248        except TypeError:249            return DatabaseErrors.CONNECTION_LOST250    def get_orders_searched(self, customer_id, prod_name, limit):251        try:252            self.cursor.execute(f'''SELECT person.first_name,253	                                  person.last_name,254	                                  product.product_name,255                                      product_ordered.date_,256	                                  my_order.id,257	                                  product.id258                                      259                               FROM my_order260	                               JOIN customer ON my_order.customer_id = customer.id261                                   JOIN person ON person.id = customer.person_id262	                               JOIN product_ordered ON product_ordered.order_id = my_order.id263                                   JOIN product ON product.id = product_ordered.product_id264                                   265                               WHERE customer_id = {customer_id} AND product.product_name LIKE "%{prod_name}%"266                               LIMIT {limit};''')267            return self.cursor.fetchall()268        except mysql.connector.errors.OperationalError:269            return DatabaseErrors.CONNECTION_LOST270        except mysql.connector.errors.InterfaceError:271            return DatabaseErrors.CONNECTION_LOST272    def delete_order(self, order_id):273        try:274            self.cursor.execute('''SET FOREIGN_KEY_CHECKS = 0;''')275            self.connection.commit()276            self.cursor.execute(f'''DELETE FROM my_order WHERE my_order.id = {order_id};''')277            self.connection.commit()278            self.cursor.execute(f'''DELETE FROM product_ordered WHERE product_ordered.order_id = {order_id};''')279            self.connection.commit()280        except mysql.connector.errors.OperationalError:281            return DatabaseErrors.CONNECTION_LOST282        except mysql.connector.errors.InterfaceError:283            return DatabaseErrors.CONNECTION_LOST284    def update_qnt_product(self, prod_id, new_qnt):285        try:286            self.cursor.execute(f'''UPDATE product SET product.qnt = "{new_qnt}" WHERE product.id = {prod_id}''')287        except mysql.connector.errors.OperationalError:288            return DatabaseErrors.CONNECTION_LOST289        except mysql.connector.errors.DataError:290            return DatabaseErrors.DATA_ERROR291        except mysql.connector.errors.InterfaceError:292            return DatabaseErrors.CONNECTION_LOST293        self.connection.commit()294    def update_name_product(self, prod_id, new_name):295        try:296            self.cursor.execute(f'''297                                    UPDATE product SET product.product_name = "{new_name}"298                                    WHERE product.id = {prod_id}299                                ''')300        except mysql.connector.errors.OperationalError:301            return DatabaseErrors.CONNECTION_LOST302        except mysql.connector.errors.DataError:303            return DatabaseErrors.DATA_ERROR304        except mysql.connector.errors.IntegrityError:305            return DatabaseErrors.NAME_ALREADY_EXIST306        except mysql.connector.errors.InterfaceError:307            return DatabaseErrors.CONNECTION_LOST308        self.connection.commit()309    def update_price_product(self, prod_id, new_price):310        try:311            self.cursor.execute(f'''UPDATE product SET product.price = "{new_price}" WHERE product.id = {prod_id}''')312        except mysql.connector.errors.OperationalError:313            return DatabaseErrors.CONNECTION_LOST314        except mysql.connector.errors.InterfaceError:315            return DatabaseErrors.CONNECTION_LOST316        self.connection.commit()317    def get_last_person(self):318        try:319            self.cursor.execute("SELECT person.id FROM person ORDER BY person.id DESC LIMIT 1")320            return self.cursor.fetchone()321        except mysql.connector.errors.OperationalError:322            return DatabaseErrors.CONNECTION_LOST323        except mysql.connector.errors.InterfaceError:324            return DatabaseErrors.CONNECTION_LOST325    def add_root(self):326        try:327            last_person_id = self.get_last_person()328            self.cursor.execute(f"""INSERT INTO administrator(person_id) VALUES ('{last_person_id[0]}');""")329        except mysql.connector.OperationalError:330            return DatabaseErrors.CONNECTION_LOST331        except mysql.connector.IntegrityError:332            return DatabaseErrors.EMAIL_ALREADY_EXIST333        except mysql.connector.errors.InterfaceError:334            return DatabaseErrors.CONNECTION_LOST335        self.connection.commit()336    def get_admins(self, limit):337        try:338            self.cursor.execute(f"""SELECT administrator.id,339                                      person.first_name,340                                      person.last_name,341                                      person.email,342                                      person.psw,343                                      person.money344                                FROM ecommerce.administrator JOIN person ON person.id = administrator.person_id345                                LIMIT {limit}346                           """)347            return self.cursor.fetchall()348        except mysql.connector.OperationalError:349            return DatabaseErrors.CONNECTION_LOST350        except mysql.connector.errors.InterfaceError:351            return DatabaseErrors.CONNECTION_LOST352    def get_person_id_super_root(self, id_):353        try:354            self.cursor.execute(f"""SELECT administrator.person_id355                               FROM ecommerce.administrator JOIN person ON person.id = administrator.person_id356                               WHERE administrator.id = {id_}357                           """)358            return self.cursor.fetchone()359        except mysql.connector.OperationalError:360            return DatabaseErrors.CONNECTION_LOST361        except mysql.connector.errors.InterfaceError:362            return DatabaseErrors.CONNECTION_LOST363    def delete_admin(self, admin_id):364        try:365            id_ = self.get_person_id_super_root(admin_id)366            if id_ is None:367                return DatabaseErrors.NO_ADMIN_FOUND368            self.cursor.execute("SET FOREIGN_KEY_CHECKS = 0")369            self.connection.commit()370            self.cursor.execute(f"DELETE FROM person WHERE person.id = {id_[0]};")371            self.connection.commit()372            self.cursor.execute(f"DELETE FROM administrator WHERE administrator.person_id = {id_[0]};")373            self.connection.commit()374        except mysql.connector.errors.OperationalError:375            return DatabaseErrors.CONNECTION_LOST376        except mysql.connector.errors.InterfaceError:377            return DatabaseErrors.CONNECTION_LOST378    def get_admin_searched(self, admin_name, limit):379        try:380            self.cursor.execute(f"""SELECT administrator.id,381                                      person.first_name,382                                      person.last_name,383                                      person.email,384                                      person.psw,385                                      person.money386                                      387                               FROM administrator388                               389                               JOIN person ON person.id = administrator.person_id390                               391                               WHERE person.first_name LIKE '%{admin_name}%' LIMIT {limit}392                           """)393            return self.cursor.fetchall()394        except mysql.connector.errors.OperationalError:395            return DatabaseErrors.CONNECTION_LOST396    def update_person_first_name(self, new_name, person_id):397        try:398            self.cursor.execute(f"UPDATE person SET person.first_name = '{new_name}' WHERE person.id = {person_id[0]};")399        except mysql.connector.OperationalError:400            return DatabaseErrors.CONNECTION_LOST401        except mysql.connector.errors.DataError:402            return DatabaseErrors.DATA_ERROR403        except mysql.connector.errors.InterfaceError:404            return DatabaseErrors.CONNECTION_LOST405        self.connection.commit()406    def update_person_last_name(self, new_name, person_id):407        try:408            self.cursor.execute(f"UPDATE person SET person.last_name = '{new_name}' WHERE person.id = {person_id[0]};")409        except mysql.connector.OperationalError:410            return DatabaseErrors.CONNECTION_LOST411        except mysql.connector.errors.DataError:412            return DatabaseErrors.DATA_ERROR413        except mysql.connector.errors.InterfaceError:414            return DatabaseErrors.CONNECTION_LOST415        self.connection.commit()416    def update_person_email(self, new_email, person_id):417        try:418            self.cursor.execute(f"UPDATE person SET person.email = '{new_email}' WHERE person.id = {person_id[0]};")419        except mysql.connector.errors.OperationalError:420            return DatabaseErrors.CONNECTION_LOST421        except mysql.connector.errors.IntegrityError:422            return DatabaseErrors.EMAIL_ALREADY_EXIST423        except mysql.connector.errors.InterfaceError:424            return DatabaseErrors.CONNECTION_LOST425        self.connection.commit()426    def update_person_password(self, new_psw, person_id):427        try:428            self.cursor.execute(f"UPDATE person SET person.psw = '{new_psw}' WHERE person.id = {person_id[0]};")429        except mysql.connector.errors.OperationalError:430            return DatabaseErrors.CONNECTION_LOST431        except mysql.connector.errors.InterfaceError:432            return DatabaseErrors.CONNECTION_LOST433        self.connection.commit()434    def update_person_money(self, new_money, person_id):435        try:436            self.cursor.execute(f"UPDATE person SET person.money = '{new_money}' WHERE person.id = {person_id[0]}")437        except mysql.connector.errors.OperationalError:438            return DatabaseErrors439        except mysql.connector.errors.InterfaceError:440            return DatabaseErrors.CONNECTION_LOST...__init__.py
Source:__init__.py  
...20    def connection_made(self, transport):21        """Called when reader thread is started"""22    def data_received(self, data):23        """Called with snippets received from the serial port"""24    def connection_lost(self, exc):25        """\26        Called when the serial port is closed or the reader loop terminated27        otherwise.28        """29        if isinstance(exc, Exception):30            raise exc31class Packetizer(Protocol):32    """33    Read binary packets from serial port. Packets are expected to be terminated34    with a TERMINATOR byte (null byte by default).35    The class also keeps track of the transport.36    """37    TERMINATOR = b'\0'38    def __init__(self):39        self.buffer = bytearray()40        self.transport = None41    def connection_made(self, transport):42        """Store transport"""43        self.transport = transport44    def connection_lost(self, exc):45        """Forget transport"""46        self.transport = None47        super(Packetizer, self).connection_lost(exc)48    def data_received(self, data):49        """Buffer received data, find TERMINATOR, call handle_packet"""50        self.buffer.extend(data)51        while self.TERMINATOR in self.buffer:52            packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)53            self.handle_packet(packet)54    def handle_packet(self, packet):55        """Process packets - to be overridden by subclassing"""56        raise NotImplementedError('please implement functionality in handle_packet')57class FramedPacket(Protocol):58    """59    Read binary packets. Packets are expected to have a start and stop marker.60    The class also keeps track of the transport.61    """62    START = b'('63    STOP = b')'64    def __init__(self):65        self.packet = bytearray()66        self.in_packet = False67        self.transport = None68    def connection_made(self, transport):69        """Store transport"""70        self.transport = transport71    def connection_lost(self, exc):72        """Forget transport"""73        self.transport = None74        self.in_packet = False75        del self.packet[:]76        super(FramedPacket, self).connection_lost(exc)77    def data_received(self, data):78        """Find data enclosed in START/STOP, call handle_packet"""79        for byte in serial.iterbytes(data):80            if byte == self.START:81                self.in_packet = True82            elif byte == self.STOP:83                self.in_packet = False84                self.handle_packet(bytes(self.packet)) # make read-only copy85                del self.packet[:]86            elif self.in_packet:87                self.packet.extend(byte)88            else:89                self.handle_out_of_packet_data(byte)90    def handle_packet(self, packet):91        """Process packets - to be overridden by subclassing"""92        raise NotImplementedError('please implement functionality in handle_packet')93    def handle_out_of_packet_data(self, data):94        """Process data that is received outside of packets"""95        pass96class LineReader(Packetizer):97    """98    Read and write (Unicode) lines from/to serial port.99    The encoding is applied.100    """101    TERMINATOR = b'\r\n'102    ENCODING = 'utf-8'103    UNICODE_HANDLING = 'replace'104    def handle_packet(self, packet):105        self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))106    def handle_line(self, line):107        """Process one line - to be overridden by subclassing"""108        raise NotImplementedError('please implement functionality in handle_line')109    def write_line(self, text):110        """111        Write text to the transport. ``text`` is a Unicode string and the encoding112        is applied before sending ans also the newline is append.113        """114        # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call115        self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)116class ReaderThread(threading.Thread):117    """\118    Implement a serial port read loop and dispatch to a Protocol instance (like119    the asyncio.Protocol) but do it with threads.120    Calls to close() will close the serial port but it is also possible to just121    stop() this thread and continue the serial port instance otherwise.122    """123    def __init__(self, serial_instance, protocol_factory):124        """\125        Initialize thread.126        Note that the serial_instance' timeout is set to one second!127        Other settings are not changed.128        """129        super(ReaderThread, self).__init__()130        self.daemon = True131        self.serial = serial_instance132        self.protocol_factory = protocol_factory133        self.alive = True134        self._lock = threading.Lock()135        self._connection_made = threading.Event()136        self.protocol = None137    def stop(self):138        """Stop the reader thread"""139        self.alive = False140        if hasattr(self.serial, 'cancel_read'):141            self.serial.cancel_read()142        self.join(2)143    def run(self):144        """Reader loop"""145        if not hasattr(self.serial, 'cancel_read'):146            self.serial.timeout = 1147        self.protocol = self.protocol_factory()148        try:149            self.protocol.connection_made(self)150        except Exception as e:151            self.alive = False152            self.protocol.connection_lost(e)153            self._connection_made.set()154            return155        error = None156        self._connection_made.set()157        while self.alive and self.serial.is_open:158            try:159                # read all that is there or wait for one byte (blocking)160                data = self.serial.read(self.serial.in_waiting or 1)161            except serial.SerialException as e:162                # probably some I/O problem such as disconnected USB serial163                # adapters -> exit164                error = e165                break166            else:167                if data:168                    # make a separated try-except for called user code169                    try:170                        self.protocol.data_received(data)171                    except Exception as e:172                        error = e173                        break174        self.alive = False175        self.protocol.connection_lost(error)176        self.protocol = None177    def write(self, data):178        """Thread safe writing (uses lock)"""179        with self._lock:180            return self.serial.write(data)181    def close(self):182        """Close the serial port and exit reader thread (uses lock)"""183        # use the lock to let other threads finish writing184        with self._lock:185            # first stop reading, so that closing can be done on idle port186            self.stop()187            self.serial.close()188    def connect(self):189        """190        Wait until connection is set up and return the transport and protocol191        instances.192        """193        if self.alive:194            self._connection_made.wait()195            if not self.alive:196                raise RuntimeError('connection_lost already called')197            return (self, self.protocol)198        else:199            raise RuntimeError('already stopped')200    # - -  context manager, returns protocol201    def __enter__(self):202        """\203        Enter context handler. May raise RuntimeError in case the connection204        could not be created.205        """206        self.start()207        self._connection_made.wait()208        if not self.alive:209            raise RuntimeError('connection_lost already called')210        return self.protocol211    def __exit__(self, exc_type, exc_val, exc_tb):212        """Leave context: close port"""213        self.close()214# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -215# test216if __name__ == '__main__':217    # pylint: disable=wrong-import-position218    import sys219    import time220    import traceback221    #~ PORT = 'spy:///dev/ttyUSB0'222    PORT = 'loop://'223    class PrintLines(LineReader):224        def connection_made(self, transport):225            super(PrintLines, self).connection_made(transport)226            sys.stdout.write('port opened\n')227            self.write_line('hello world')228        def handle_line(self, data):229            sys.stdout.write('line received: {!r}\n'.format(data))230        def connection_lost(self, exc):231            if exc:232                traceback.print_exc(exc)233            sys.stdout.write('port closed\n')234    ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)235    with ReaderThread(ser, PrintLines) as protocol:236        protocol.write_line('hello')237        time.sleep(2)238    # alternative usage239    ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)240    t = ReaderThread(ser, PrintLines)241    t.start()242    transport, protocol = t.connect()243    protocol.write_line('hello')244    time.sleep(2)...fdesc.py
Source:fdesc.py  
1# -*- test-case-name: twisted.test.test_fdesc -*-2# Copyright (c) 2001-2009 Twisted Matrix Laboratories.3# See LICENSE for details.4"""5Utility functions for dealing with POSIX file descriptors.6"""7import os8import errno9try:10    import fcntl11except ImportError:12    fcntl = None13# twisted imports14from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE15from twisted.python.runtime import platformType16def setNonBlocking(fd):17    """18    Make a file descriptor non-blocking.19    """20    flags = fcntl.fcntl(fd, fcntl.F_GETFL)21    flags = flags | os.O_NONBLOCK22    fcntl.fcntl(fd, fcntl.F_SETFL, flags)23def setBlocking(fd):24    """25    Make a file descriptor blocking.26    """27    flags = fcntl.fcntl(fd, fcntl.F_GETFL)28    flags = flags & ~os.O_NONBLOCK29    fcntl.fcntl(fd, fcntl.F_SETFL, flags)30if fcntl is None:31    # fcntl isn't available on Windows.  By default, handles aren't32    # inherited on Windows, so we can do nothing here.33    _setCloseOnExec = _unsetCloseOnExec = lambda fd: None34else:35    def _setCloseOnExec(fd):36        """37        Make a file descriptor close-on-exec.38        """39        flags = fcntl.fcntl(fd, fcntl.F_GETFD)40        flags = flags | fcntl.FD_CLOEXEC41        fcntl.fcntl(fd, fcntl.F_SETFD, flags)42    def _unsetCloseOnExec(fd):43        """44        Make a file descriptor close-on-exec.45        """46        flags = fcntl.fcntl(fd, fcntl.F_GETFD)47        flags = flags & ~fcntl.FD_CLOEXEC48        fcntl.fcntl(fd, fcntl.F_SETFD, flags)49def readFromFD(fd, callback):50    """51    Read from file descriptor, calling callback with resulting data.52    If successful, call 'callback' with a single argument: the53    resulting data.54    Returns same thing FileDescriptor.doRead would: CONNECTION_LOST,55    CONNECTION_DONE, or None.56    @type fd: C{int}57    @param fd: non-blocking file descriptor to be read from.58    @param callback: a callable which accepts a single argument. If59    data is read from the file descriptor it will be called with this60    data. Handling exceptions from calling the callback is up to the61    caller.62    Note that if the descriptor is still connected but no data is read,63    None will be returned but callback will not be called.64    @return: CONNECTION_LOST on error, CONNECTION_DONE when fd is65    closed, otherwise None.66    """67    try:68        output = os.read(fd, 8192)69    except (OSError, IOError), ioe:70        if ioe.args[0] in (errno.EAGAIN, errno.EINTR):71            return72        else:73            return CONNECTION_LOST74    if not output:75        return CONNECTION_DONE76    callback(output)77def writeToFD(fd, data):78    """79    Write data to file descriptor.80    Returns same thing FileDescriptor.writeSomeData would.81    @type fd: C{int}82    @param fd: non-blocking file descriptor to be written to.83    @type data: C{str} or C{buffer}84    @param data: bytes to write to fd.85    @return: number of bytes written, or CONNECTION_LOST.86    """87    try:88        return os.write(fd, data)89    except (OSError, IOError), io:90        if io.errno in (errno.EAGAIN, errno.EINTR):91            return 092        return CONNECTION_LOST...helpers.py
Source:helpers.py  
...34    return zone_combinations35class HandleUDPBroadcast:36    def connection_made(self, transport):37        pass38    def connection_lost(self, exc):39        pass40class HandleServer(asyncio.Protocol):41    def __init__(self, connection_made, data_received, connection_lost):42        self._connection_made_callback = connection_made43        self._data_received_callback = data_received44        self._connection_lost_callback = connection_lost45    def connection_made(self, transport):46        self._connection_made_callback(transport)47    def data_received(self, data):48        self._data_received_callback(data)49    def connection_lost(self, exc):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
