Best Python code snippet using robotframework
models.py
Source:models.py  
...30        except Exception as e: print(e)31        finally:32            connection.close()33            return result34    def read_all(self):35        try:36            result = None37            connection = pyodbc.connect(connString)38            cursor = connection.cursor()39            cursor.execute(self.queries['read_all'])40            result = cursor.fetchall()41            if not result: raise Exception('(dao_network.read_all) >>> No se encontraron valores.')42        except Exception as e: print(e)43        finally:44            connection.close()45            return result46    def read(self, network):47        try:48            result = None49            connection = pyodbc.connect(connString)50            cursor = connection.cursor()51            cursor.execute(self.queries['read_by_name'], network)52            result = cursor.fetchone()53            if not result: raise Exception('(dao_network.read_by_name) >>> No se encontraron valores.')54        except Exception as e: print(e)55        finally:56            connection.close()57            return result58    def insert(self, network):59        try:60            message = None61            connection = pyodbc.connect(connString)62            cursor = connection.cursor()63            # Chequear que la network no exista aún64            cursor.execute(self.queries['read_by_name'], network)65            exists = cursor.fetchone()66            if not exists:67                cursor.execute(self.queries['insert'], network)68                message = '(dao_network.insert) >>> network \"{0}\" insertada.'.format(network)69            else:70                message = '(dao_network.insert) >>> network \"{0}\" ya existe'.format(network)71        except pyodbc.DatabaseError:72            connection.rollback()73        else:74            connection.commit()75        finally: 76            connection.autocommit = True77            connection.close()78            return message79class ContentType:80    queries = {81        'read_all'      : 'SELECT * FROM ContentType',82        'read_by_name'  : 'SELECT * FROM ContentType WHERE content_type = ?',83        'insert'        : 'INSERT INTO ContentType VALUES(?)',84        'get_id'        : 'SELECT c.id FROM ContentType c WHERE c.content_type = ? '85    }86    def get_id(self, content_type):87        try:88            result = None89            connection = pyodbc.connect(connString)90            cursor = connection.cursor()91            cursor.execute(self.queries['get_id'], content_type)92            result = cursor.fetchone()93            if not result: raise Exception('(dao_contentType.get_id) >>> No se encontraron valores.')94        except Exception as e: print(e)95        finally:96            connection.close()97            return result98    def read_all(self):99        try:100            result = None101            connection = pyodbc.connect(connString)102            cursor = connection.cursor()103            cursor.execute(self.queries['read_all'])104            result = cursor.fetchall()105            if not result: raise Exception('(dao_contentType.read_all) >>> No se encontraron valores.')106        except Exception as e: print(e)107        finally:108            connection.close()109            return result110            111    def read(self, content_type):112        try:113            result = None114            connection = pyodbc.connect(connString)115            cursor = connection.cursor()116            117            cursor.execute(self.queries['read_by_name'], content_type)118            result = cursor.fetchone()119            if not result: raise Exception('(dao_contentType.read_by_name) >>> No se encontraron valores')120        except Exception as e: print(e)121        finally:122            connection.close()123            return result124    def insert(self, content_type):125        try:126            message = None127            connection = pyodbc.connect(connString)128            cursor = connection.cursor()129            # Chequear que no exista aún130            cursor.execute(self.queries['read_by_name'], content_type)131            exists = cursor.fetchone()132            if not exists:133                cursor.execute(self.queries['insert'], content_type)134                message = '(dao_contentType.insert) >>> content_type \"{0}\" insertado.'.format(content_type)135            else:136                message = '(dao_contentType.insert) >>> content_type \"{0}\" ya existe.'.format(content_type)137        except pyodbc.DatabaseError:138            connection.rollback()139        else:140            connection.commit()141        finally:142            connection.autocommit = True143            connection.close()144            return message145class Hashtag:146    queries = {147        'read_all'      : 'SELECT * FROM Hashtag',148        'read_by_name'  : 'SELECT * FROM Hashtag WHERE hashtag = ?',149        'insert'        : 'INSERT INTO Hashtag VALUES(?)'150    }151    def read_all(self):152        try:153            result = None154            connection = pyodbc.connect(connString)155            cursor = connection.cursor()156            cursor.execute(self.queries['read_all'])157            result = cursor.fetchall()158            if not result: raise Exception('(dao_hashtag.read_all) >>> No se encontraron valores.')159        except Exception as e: print(e)160        finally:161            connection.close()162            return result163            164    def read(self, hashtag):165        try:166            result = None167            connection = pyodbc.connect(connString)168            cursor = connection.cursor()169            170            cursor.execute(self.queries['read_by_name'], hashtag)171            result = cursor.fetchone()172            if not result: raise Exception('(dao_hashtag.read_by_name) >>> No se encontraron valores')173        except Exception as e: print(e)174        finally:175            connection.close()176            return result177    def insert(self, hashtag):178        try:179            message = None180            connection = pyodbc.connect(connString)181            cursor = connection.cursor()182            # Chequear que no exista aún183            cursor.execute(self.queries['read_by_name'], hashtag)184            exists = cursor.fetchone()185            if not exists:186                cursor.execute(self.queries['insert'], hashtag)187                message = '(dao_hashtag.insert) >>> hashtag \"{0}\" insertado.'.format(hashtag)188            else:189                message = '(dao_hashtag.insert) >>> hashtag \"{0}\" ya existe.'.format(hashtag)190        except pyodbc.DatabaseError:191            connection.rollback()192        else:193            connection.commit()194        finally:195            connection.autocommit = True196            connection.close()197            return message198class Entity:199    queries = {200        'read_all'      : 'SELECT * FROM Entity',201        'read_by_name'  : 'SELECT * FROM Entity WHERE ent_name = ?',202        'insert'        : 'INSERT INTO Entity VALUES(?)'203    }204    def read_all(self):205        try:206            result = None207            connection = pyodbc.connect(connString)208            cursor = connection.cursor()209            cursor.execute(self.queries['read_all'])210            result = cursor.fetchall()211            if not result: raise Exception('(dao_entity.read_all) >>> No se encontraron valores.')212        except Exception as e: print(e)213        finally:214            connection.close()215            return result216            217    def read(self, entity):218        try:219            result = None220            connection = pyodbc.connect(connString)221            cursor = connection.cursor()222            223            cursor.execute(self.queries['read_by_name'], entity)224            result = cursor.fetchone()225            if not result: raise Exception('(dao_entity.read_by_name) >>> No se encontraron valores')226        except Exception as e: print(e)227        finally:228            connection.close()229            return result230    def insert(self, entity):231        try:232            message = None233            connection = pyodbc.connect(connString)234            cursor = connection.cursor()235            # Chequear que no exista aún236            cursor.execute(self.queries['read_by_name'], entity)237            exists = cursor.fetchone()238            if not exists:239                cursor.execute(self.queries['insert'], entity)240                message = '(dao_entity.insert) >>> entidad \"{0}\" insertada.'.format(entity)241            else:242                message = '(dao_entity.insert) >>> entidad \"{0}\" ya existe.'.format(entity)243        except pyodbc.DatabaseError:244            connection.rollback()245        else:246            connection.commit()247        finally:248            connection.autocommit = True249            connection.close()250            return message251class Rating:252    queries = {253        'read_all'      : 'SELECT * FROM Rating',254        'read_by_date'  : 'SELECT * FROM Rating r WHERE r.rating_date = ?',255        'insert'        : 'INSERT INTO Rating VALUES(?,?)'256    }257    def read_all(self):258        try:259            result = None260            connection = pyodbc.connect(connString)261            cursor = connection.cursor()262            cursor.execute(self.queries['read_all'])263            result = cursor.fetchall()264            if not result: raise Exception('(dao_rating.read_all) >>> No se encontraron valores.')265            result = [(x[0], str(x[1]), x[2]) for x in result]266        except Exception as e: print(e)267        finally:268            connection.close()269            return result270            271    def read(self, rating_date):272        try:273            result = None274            connection = pyodbc.connect(connString)275            cursor = connection.cursor()276            277            cursor.execute(self.queries['read_by_date'], rating_date)278            result = cursor.fetchone()279            if not result: raise Exception('(dao_rating.read_by_date) >>> No se encontraron valores')280            result = (result[0], str(result[1]), result[2])281        except Exception as e: print(e)282        finally:283            connection.close()284            return result285    def insert(self, rating_date, points):286        try:287            message = None288            connection = pyodbc.connect(connString)289            cursor = connection.cursor()290            # Chequear que no exista aún291            cursor.execute(self.queries['read_by_date'], rating_date)292            exists = cursor.fetchone()293            if not exists:294                cursor.execute(self.queries['insert'], rating_date, points)295                message = '(dao_rating.insert) >>> rating \"{0} ~ {1}\" insertado.'.format(rating_date, points)296            else:297                message = '(dao_rating.insert) >>> rating \"{0} ~ {1}\" ya existe.'.format(rating_date, points)298        except pyodbc.DatabaseError:299            connection.rollback()300        else:301            connection.commit()302        finally:303            connection.autocommit = True304            connection.close()305            return message306class Sentiment:307    queries = {308        'read_all'      : 'SELECT * FROM Sentiment',309        'read_by_name'  : 'SELECT * FROM Sentiment s WHERE sentiment = ?',310        'insert'        : 'INSERT INTO Sentiment VALUES(?)',311        'get_id'        : 'SELECT s.id FROM Sentiment s WHERE s.sentiment = ?'312    }313    def get_id(self, sentiment):314        try:315            result = None316            connection = pyodbc.connect(connString)317            cursor = connection.cursor()318            cursor.execute(self.queries['get_id'], sentiment)319            result = cursor.fetchone()320            if not result: raise Exception('(dao_sentiment.get_id) >>> No se encontraron valores.')321        except Exception as e: print(e)322        finally:323            connection.close()324            return result325    def read_all(self):326        try:327            result = None328            connection = pyodbc.connect(connString)329            cursor = connection.cursor()330            cursor.execute(self.queries['read_all'])331            result = cursor.fetchall()332            if not result: raise Exception('(dao_sentiment.read_all) >>> No se encontraron valores.')333        except Exception as e: print(e)334        finally:335            connection.close()336            return result337            338    def read(self, sentiment):339        try:340            result = None341            connection = pyodbc.connect(connString)342            cursor = connection.cursor()343            344            cursor.execute(self.queries['read_by_name'], sentiment)345            result = cursor.fetchone()346            if not result: raise Exception('(dao_sentiment.read_by_name) >>> No se encontraron valores')347        except Exception as e: print(e)348        finally:349            connection.close()350            return result351    def insert(self, sentiment):352        try:353            message = None354            connection = pyodbc.connect(connString)355            cursor = connection.cursor()356            # Chequear que no exista aún357            cursor.execute(self.queries['read_by_name'], sentiment)358            exists = cursor.fetchone()359            if not exists:360                cursor.execute(self.queries['insert'], sentiment)361                message = '(dao_sentiment.insert) >>> sentimento \"{0}\" insertado.'.format(sentiment)362            else:363                message = '(dao_sentiment.insert) >>> sentimento \"{0}\" ya existe.'.format(sentiment)364        except pyodbc.DatabaseError:365            connection.rollback()366        else:367            connection.commit()368        finally:369            connection.autocommit = True370            connection.close()371            return message372class UserProfile:373    queries = {374        'read_all'      : 'SELECT * FROM UserProfile',375        'read_by_name'  : 'SELECT * FROM UserProfile WHERE profile_name = ?',376        'insert'        : 'INSERT INTO UserProfile VALUES(?,?)',377        'get_id'        : 'SELECT u.id FROM UserProfile u WHERE u.profile_name = ? '378    }379    def get_id(self, usuario):380        try:381            result = None382            connection = pyodbc.connect(connString)383            cursor = connection.cursor()384            cursor.execute(self.queries['get_id'], usuario)385            result = cursor.fetchone()386            if not result: raise Exception('(dao_user_profile.get_id) >>> No se encontraron valores.')387        except Exception as e: print(e)388        finally:389            connection.close()390            return result391    def read_all(self):392        try:393            result = None394            connection = pyodbc.connect(connString)395            cursor = connection.cursor()396            cursor.execute(self.queries['read_all'])397            result = cursor.fetchall()398            if not result: raise Exception('(dao_userProfile.read_all) >>> No se encontraron valores.')399        except Exception as e: print(e)400        finally:401            connection.close()402            return result403            404    def read(self, profile_name):405        try:406            result = None407            connection = pyodbc.connect(connString)408            cursor = connection.cursor()409            410            cursor.execute(self.queries['read_by_name'], profile_name)411            result = cursor.fetchone()412            if not result: raise Exception('(dao_userProfile.read_by_name) >>> No se encontraron valores')413        except Exception as e: print(e)414        finally:415            connection.close()416            return result417    def insert(self, profile_name, followers):418        try:419            message = None420            connection = pyodbc.connect(connString)421            cursor = connection.cursor()422            # Chequear que no exista aún423            cursor.execute(self.queries['read_by_name'], profile_name)424            exists = cursor.fetchone()425            if not exists:426                cursor.execute(self.queries['insert'], profile_name, followers)427                message = '(dao_userProfile.insert) >>> usuario \"{0} ~ {1}\" insertado.'.format(profile_name, followers)428            else:429                message = '(dao_userProfile.insert) >>> usuario \"{0} ~ {1}\" ya existe.'.format(profile_name, followers)430        except pyodbc.DatabaseError:431            connection.rollback()432        else:433            connection.commit()434        finally:435            connection.autocommit = True436            connection.close()437            return message438class Post:439    queries = {440        'read_all'      : 'SELECT * FROM Post',441        'read_by_id_message'  : 'SELECT * FROM Post WHERE id_message = ?',442        'insert'        : 'INSERT INTO Post VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'443    }444    def read_all(self):445        try:446            result = None447            connection = pyodbc.connect(connString)448            cursor = connection.cursor()449            cursor.execute(self.queries['read_all'])450            result = cursor.fetchall()451            if not result: raise Exception('(dao_userProfile.read_all) >>> No se encontraron valores.')452        except Exception as e: print(e)453        finally:454            connection.close()455            return result456            457    def read(self, id_message):458        try:...test_fileobj.py
Source:test_fileobj.py  
1"""2Tests for load()ing and dump()ing with file-like objects.3"""4import multiprocessing5import os6import pathlib7import platform8import queue9import shutil10import socket11import socketserver12import tempfile13import traceback14import unittest15import numpy as np16import kastore as kas17IS_WINDOWS = platform.system() == "Windows"18class DictVerifyMixin:19    def verify_dicts_equal(self, d1, d2):20        self.assertEqual(sorted(d1.keys()), sorted(d2.keys()))21        for key in d1.keys():22            np.testing.assert_equal(d1[key], d2[key])23# pathlib.Path objects should work  transparently, and these tests check it24# isn't broken by fileobj-enabling code.25class PathlibMixin(DictVerifyMixin):26    def setUp(self):27        fd, path = tempfile.mkstemp(prefix="kas_test_pathlib")28        os.close(fd)29        self.temp_file = pathlib.Path(path)30    def tearDown(self):31        self.temp_file.unlink()32    def test_dump_to_pathlib_Path(self):33        data = {"a": np.arange(10)}34        kas.dump(data, self.temp_file, engine=self.engine)35    def test_load_from_pathlib_Path(self):36        data = {"a": np.arange(10)}37        kas.dump(data, str(self.temp_file), engine=self.engine)38        file_size = self.temp_file.stat().st_size39        for read_all in [True, False]:40            data_out = kas.load(self.temp_file, read_all=read_all, engine=self.engine)41            data2 = dict(data_out.items())42            file_size2 = self.temp_file.stat().st_size43            self.verify_dicts_equal(data, data2)44            self.assertEqual(file_size, file_size2)45class FileobjMixin(DictVerifyMixin):46    def setUp(self):47        fd, path = tempfile.mkstemp(prefix="kas_test_fileobj")48        os.close(fd)49        self.temp_file = path50    def tearDown(self):51        os.unlink(self.temp_file)52    def test_dump_fileobj_single(self):53        data = {"a": np.arange(10)}54        with open(self.temp_file, "wb") as f:55            kas.dump(data, f, engine=self.engine)56        data_out = kas.load(self.temp_file, engine=self.engine)57        data2 = dict(data_out.items())58        self.verify_dicts_equal(data, data2)59    def test_dump_fileobj_multi(self):60        with open(self.temp_file, "wb") as f:61            for i in range(10):62                data = {63                    "i" + str(i): np.arange(i, dtype=int),64                    "f" + str(i): np.arange(i, dtype=float),65                }66                kas.dump(data, f, engine=self.engine)67    def test_load_fileobj_single(self):68        data = {"a": np.arange(10)}69        kas.dump(data, self.temp_file, engine=self.engine)70        file_size = os.stat(self.temp_file).st_size71        for read_all in [True, False]:72            with open(self.temp_file, "rb") as f:73                data_out = kas.load(f, read_all=read_all, engine=self.engine)74                data2 = dict(data_out.items())75                file_offset = f.tell()76            self.verify_dicts_equal(data, data2)77            self.assertEqual(file_offset, file_size)78    def test_load_and_dump_fileobj_single(self):79        data = {"a": np.arange(10)}80        with open(self.temp_file, "wb") as f:81            kas.dump(data, f, engine=self.engine)82        file_size = os.stat(self.temp_file).st_size83        for read_all in [True, False]:84            with open(self.temp_file, "rb") as f:85                data_out = kas.load(f, read_all=read_all, engine=self.engine)86                data2 = dict(data_out.items())87                file_offset = f.tell()88            self.verify_dicts_equal(data, data2)89            self.assertEqual(file_offset, file_size)90    def test_load_and_dump_fileobj_multi(self):91        datalist = [92            {93                "i" + str(i): i + np.arange(10 ** 5, dtype=int),94                "f" + str(i): i + np.arange(10 ** 5, dtype=float),95            }96            for i in range(10)97        ]98        file_offsets = []99        with open(self.temp_file, "wb") as f:100            for data in datalist:101                kas.dump(data, f, engine=self.engine)102                file_offsets.append(f.tell())103        for read_all in [True, False]:104            with open(self.temp_file, "rb") as f:105                for data, file_offset in zip(datalist, file_offsets):106                    data_out = kas.load(f, read_all=read_all, engine=self.engine)107                    data2 = dict(data_out.items())108                    file_offset2 = f.tell()109                    self.verify_dicts_equal(data, data2)110                    self.assertEqual(file_offset, file_offset2)111    def test_load_and_dump_file_single_rw(self):112        data = {"a": np.arange(10)}113        with open(self.temp_file, "r+b") as f:114            kas.dump(data, f, engine=self.engine)115            for read_all in [True, False]:116                f.seek(0)117                data_out = kas.load(f, read_all=read_all, engine=self.engine)118                data2 = dict(data_out.items())119                self.verify_dicts_equal(data, data2)120    def test_load_and_dump_file_multi_rw(self):121        datalist = [122            {123                "i" + str(i): i + np.arange(10 ** 5, dtype=int),124                "f" + str(i): i + np.arange(10 ** 5, dtype=float),125            }126            for i in range(10)127        ]128        with open(self.temp_file, "r+b") as f:129            for data in datalist:130                kas.dump(data, f, engine=self.engine)131            for read_all in [True, False]:132                f.seek(0)133                for data in datalist:134                    data_out = kas.load(f, read_all=read_all, engine=self.engine)135                    data2 = dict(data_out.items())136                    self.verify_dicts_equal(data, data2)137    def test_load_and_dump_fd_single_rw(self):138        data = {"a": np.arange(10)}139        with open(self.temp_file, "r+b") as f:140            fd = f.fileno()141            kas.dump(data, fd, engine=self.engine)142            for read_all in [True, False]:143                os.lseek(fd, 0, os.SEEK_SET)144                data_out = kas.load(fd, read_all=read_all, engine=self.engine)145                data2 = dict(data_out.items())146                self.verify_dicts_equal(data, data2)147    def test_load_and_dump_fd_multi_rw(self):148        datalist = [149            {150                "i" + str(i): i + np.arange(10 ** 5, dtype=int),151                "f" + str(i): i + np.arange(10 ** 5, dtype=float),152            }153            for i in range(20)154        ]155        with open(self.temp_file, "r+b") as f:156            fd = f.fileno()157            for data in datalist:158                kas.dump(data, fd, engine=self.engine)159            for read_all in [True, False]:160                os.lseek(fd, 0, os.SEEK_SET)161                for data in datalist:162                    data_out = kas.load(fd, read_all=read_all, engine=self.engine)163                    data2 = dict(data_out.items())164                    self.verify_dicts_equal(data, data2)165def dump_to_stream(q_err, q_in, file_out, engine):166    """167    Get data dicts from `q_in` and kas.dump() them to `file_out`.168    Uncaught exceptions are placed onto the `q_err` queue.169    """170    try:171        with open(file_out, "wb") as f:172            while True:173                data = q_in.get()174                if data is None:175                    break176                kas.dump(data, f, engine=engine)177    except Exception as exc:178        tb = traceback.format_exc()179        q_err.put((exc, tb))180def load_from_stream(q_err, q_out, file_in, engine, read_all):181    """182    kas.load() stores from `file_in` and put their data onto `q_out`.183    Uncaught exceptions are placed onto the `q_err` queue.184    """185    try:186        with open(file_in, "rb") as f:187            while True:188                try:189                    data = kas.load(f, read_all=read_all, engine=engine)190                except EOFError:191                    break192                q_out.put(dict(data.items()))193    except Exception as exc:194        tb = traceback.format_exc()195        q_err.put((exc, tb))196class StreamingMixin(DictVerifyMixin):197    def setUp(self):198        self.temp_dir = tempfile.mkdtemp(prefix="kas_test_streaming")199        self.temp_fifo = os.path.join(self.temp_dir, "fifo")200        os.mkfifo(self.temp_fifo)201    def tearDown(self):202        shutil.rmtree(self.temp_dir)203    def stream(self, datalist, read_all=True):204        """205        data -> q_in -> kas.dump(..., fifo) -> kas.load(fifo) -> q_out -> data_out206        """207        q_err = multiprocessing.Queue()208        q_in = multiprocessing.Queue()209        q_out = multiprocessing.Queue()210        proc1 = multiprocessing.Process(211            target=dump_to_stream, args=(q_err, q_in, self.temp_fifo, self.engine)212        )213        proc2 = multiprocessing.Process(214            target=load_from_stream,215            args=(q_err, q_out, self.temp_fifo, self.engine, read_all),216        )217        proc1.start()218        proc2.start()219        for data in datalist:220            q_in.put(data)221        q_in.put(None)  # signal the process that we're done222        proc1.join(timeout=3)223        if not q_err.empty():224            # re-raise the first child exception225            exc, tb = q_err.get()226            print(tb)227            raise exc228        if proc1.is_alive():229            # prevent hang if proc1 failed to join230            proc1.terminate()231            proc2.terminate()232            self.assertTrue(False, msg="proc1 (kas.dump) failed to join")233        datalist_out = []234        for _ in datalist:235            try:236                data_out = q_out.get(timeout=3)237            except queue.Empty:238                # terminate proc2 so we don't hang239                proc2.terminate()240                raise241            datalist_out.append(data_out)242        proc2.join(timeout=3)243        if proc2.is_alive():244            # prevent hang if proc2 failed to join245            proc2.terminate()246            self.assertTrue(False, msg="proc2 (kas.load) failed to join")247        self.assertEqual(len(datalist), len(datalist_out))248        for data, data_out in zip(datalist, datalist_out):249            self.verify_dicts_equal(data, data_out)250    def test_stream_single(self):251        datalist = [{"a": np.array([0])}]252        self.stream(datalist)253    def test_stream_multi(self):254        datalist = [255            {256                "i" + str(i): i + np.arange(10 ** 5, dtype=int),257                "f" + str(i): i + np.arange(10 ** 5, dtype=float),258            }259            for i in range(100)260        ]261        self.stream(datalist)262ADDRESS = ("localhost", 10009)263class TestServer(socketserver.ThreadingTCPServer):264    allow_reuse_address = True265class StoreEchoHandler(socketserver.BaseRequestHandler):266    def handle(self):267        while True:268            try:269                data = kas.load(270                    self.request.fileno(), engine=self.engine, read_all=True271                )272            except EOFError:273                break274            kas.dump(dict(data), self.request.fileno(), engine=self.engine)275        # We only read one list, so shutdown the server straight away276        self.server.shutdown()277class StoreEchoHandlerCEngine(StoreEchoHandler):278    engine = kas.C_ENGINE279class StoreEchoHandlerPyEngine(StoreEchoHandler):280    engine = kas.PY_ENGINE281def server_process(engine, q):282    handlers = {283        kas.C_ENGINE: StoreEchoHandlerCEngine,284        kas.PY_ENGINE: StoreEchoHandlerPyEngine,285    }286    server = TestServer(ADDRESS, handlers[engine])287    # Tell the client (on the other end of the queue) that it's OK to open288    # a connection289    q.put(None)290    server.serve_forever()291class SocketMixin(DictVerifyMixin):292    def setUp(self):293        # Use a queue to synchronise the startup of the server and the client.294        q = multiprocessing.Queue()295        self.server_process = multiprocessing.Process(296            target=server_process, args=(self.engine, q)297        )298        self.server_process.start()299        q.get()300        self.client = socket.create_connection(ADDRESS)301    def tearDown(self):302        self.client.close()303        self.server_process.join()304    def verify_stream(self, data_list):305        fd = self.client.fileno()306        for data in data_list:307            kas.dump(data, fd, engine=self.engine)308            echo_data = kas.load(fd, read_all=True, engine=self.engine)309            self.verify_dicts_equal(data, echo_data)310    def test_single(self):311        self.verify_stream([{"a": np.arange(10)}])312    def test_two(self):313        self.verify_stream([{"a": np.zeros(10)}, {"b": np.zeros(100)}])314    def test_multi(self):315        datalist = [316            {317                "i" + str(i): i + np.arange(10 ** 5, dtype=int),318                "f" + str(i): i + np.arange(10 ** 5, dtype=float),319            }320            for i in range(10)321        ]322        self.verify_stream(datalist)323class TestPathlibCEngine(PathlibMixin, unittest.TestCase):324    engine = kas.C_ENGINE325class TestPathlibPyEngine(PathlibMixin, unittest.TestCase):326    engine = kas.PY_ENGINE327class TestFileobjCEngine(FileobjMixin, unittest.TestCase):328    engine = kas.C_ENGINE329class TestFileobjPyEngine(FileobjMixin, unittest.TestCase):330    engine = kas.PY_ENGINE331@unittest.skipIf(IS_WINDOWS, "FIFOs don't exist on Windows")332class TestStreamingCEngine(StreamingMixin, unittest.TestCase):333    engine = kas.C_ENGINE334@unittest.skipIf(IS_WINDOWS, "FIFOs don't exist on Windows")335class TestStreamingPyEngine(StreamingMixin, unittest.TestCase):336    engine = kas.PY_ENGINE337@unittest.skipIf(IS_WINDOWS, "Deadlocking on Windows")338class TestSocketCEngine(SocketMixin, unittest.TestCase):339    engine = kas.C_ENGINE340@unittest.skipIf(IS_WINDOWS, "Deadlocking on Windows")341class TestSocketPyEngine(SocketMixin, unittest.TestCase):...test_errors.py
Source:test_errors.py  
1"""2Tests for error conditions.3"""4import os5import platform6import struct7import tempfile8import unittest9import numpy as np10import kastore as kas11import kastore.store as store12IS_WINDOWS = platform.system() == "Windows"13class InterfaceMixin:14    """15    Exercise the low-level interface.16    """17    def setUp(self):18        fd, path = tempfile.mkstemp(prefix="kas_test_errors")19        os.close(fd)20        self.temp_file = path21    def tearDown(self):22        os.unlink(self.temp_file)23    def test_bad_dicts(self):24        for bad_dict in [[], "w34", None, 1]:25            self.assertRaises(26                TypeError, kas.dump, bad_dict, self.temp_file, engine=self.engine27            )28            self.assertRaises(29                TypeError, kas.dump, bad_dict, self.temp_file, engine=self.engine30            )31    def test_bad_filename_type(self):32        for bad_filename in [[], None, {}]:33            self.assertRaises(TypeError, kas.dump, {}, bad_filename, engine=self.engine)34            self.assertRaises(TypeError, kas.dump, {}, bad_filename, engine=self.engine)35            self.assertRaises(TypeError, kas.load, bad_filename, engine=self.engine)36            self.assertRaises(TypeError, kas.load, bad_filename, engine=self.engine)37    def test_bad_keys(self):38        a = np.zeros(1)39        for bad_key in [(1234,), b"1234", None, 1234]:40            with self.assertRaises(TypeError):41                kas.dump({bad_key: a}, self.temp_file, engine=self.engine)42    def test_bad_arrays(self):43        kas.dump({"a": []}, self.temp_file, engine=self.engine)44        for bad_array in [kas, lambda x: x, "1234", None, [[0, 1], [0, 2]]]:45            self.assertRaises(46                ValueError,47                kas.dump,48                {"a": bad_array},49                self.temp_file,50                engine=self.engine,51            )52        # TODO add tests for arrays in fortran order and so on.53    def test_file_not_found(self):54        a = np.zeros(1)55        for bad_file in ["no_such_file", "/no/such/file"]:56            self.assertRaises(FileNotFoundError, kas.load, bad_file, engine=self.engine)57        self.assertRaises(58            FileNotFoundError, kas.dump, {"a": a}, "/no/such/file", engine=self.engine59        )60    def test_file_is_a_directory(self):61        tmp_dir = tempfile.mkdtemp()62        try:63            exception = IsADirectoryError64            if IS_WINDOWS:65                exception = PermissionError66            self.assertRaises(67                exception, kas.dump, {"a": []}, tmp_dir, engine=self.engine68            )69            self.assertRaises(exception, kas.load, tmp_dir, engine=self.engine)70        finally:71            os.rmdir(tmp_dir)72class TestInterfacePyEngine(InterfaceMixin, unittest.TestCase):73    engine = kas.PY_ENGINE74class TestInterfaceCEngine(InterfaceMixin, unittest.TestCase):75    engine = kas.C_ENGINE76class TestEngines(unittest.TestCase):77    """78    Check that we correctly identify bad engines79    """80    bad_engines = [None, {}, "no such engine", b"not an engine"]81    def test_bad_engine_dump(self):82        for bad_engine in self.bad_engines:83            self.assertRaises(ValueError, kas.dump, {}, "", engine=bad_engine)84    def test_bad_engine_load(self):85        for bad_engine in self.bad_engines:86            self.assertRaises(ValueError, kas.load, "", engine=bad_engine)87class FileFormatsMixin:88    """89    Common utilities for tests on the file format.90    """91    def setUp(self):92        fd, path = tempfile.mkstemp(prefix="kas_malformed_files")93        os.close(fd)94        self.temp_file = path95    def tearDown(self):96        os.unlink(self.temp_file)97    def write_file(self, num_items=0):98        data = {}99        for j in range(num_items):100            data["a" * (j + 1)] = np.arange(j)101        kas.dump(data, self.temp_file)102class MalformedFilesMixin(FileFormatsMixin):103    """104    Tests for various types of malformed intput files.105    """106    def test_empty_file(self):107        with open(self.temp_file, "w"):108            pass109        self.assertEqual(os.path.getsize(self.temp_file), 0)110        self.assertRaises(111            EOFError,112            kas.load,113            self.temp_file,114            engine=self.engine,115            read_all=self.read_all,116        )117    def test_bad_magic(self):118        self.write_file()119        with open(self.temp_file, "rb") as f:120            buff = bytearray(f.read())121        before_len = len(buff)122        buff[0:8] = b"12345678"123        self.assertEqual(len(buff), before_len)124        with open(self.temp_file, "wb") as f:125            f.write(buff)126        self.assertRaises(127            kas.FileFormatError,128            kas.load,129            self.temp_file,130            engine=self.engine,131            read_all=self.read_all,132        )133    def test_bad_file_size(self):134        for num_items in range(10):135            for offset in [-2, -1, 1, 2 ** 10]:136                self.write_file(num_items)137                file_size = os.path.getsize(self.temp_file)138                with open(self.temp_file, "rb") as f:139                    buff = bytearray(f.read())140                before_len = len(buff)141                buff[16:24] = struct.pack("<Q", file_size + offset)142                self.assertEqual(len(buff), before_len)143                with open(self.temp_file, "wb") as f:144                    f.write(buff)145                with self.assertRaises(kas.FileFormatError):146                    kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)147    def test_truncated_file_descriptors(self):148        for num_items in range(2, 5):149            self.write_file(num_items)150            with open(self.temp_file, "rb") as f:151                buff = bytearray(f.read())152            with open(self.temp_file, "wb") as f:153                f.write(buff[: num_items * store.ITEM_DESCRIPTOR_SIZE - 1])154            with self.assertRaises(kas.FileFormatError):155                kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)156    def test_truncated_file_data(self):157        for num_items in range(2, 5):158            self.write_file(num_items)159            with open(self.temp_file, "rb") as f:160                buff = bytearray(f.read())161            with open(self.temp_file, "wb") as f:162                f.write(buff[:-1])163            with self.assertRaises(kas.FileFormatError):164                # Must call dict to ensure all the keys are loaded.165                dict(166                    kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)167                )168    def test_bad_item_types(self):169        items = {"a": []}170        descriptors, file_size = store.pack_items(items)171        num_types = len(store.np_dtype_to_type_map)172        for bad_type in [num_types + 1, 2 * num_types]:173            with open(self.temp_file, "wb") as f:174                descriptors[0].type = bad_type175                store.write_file(f, descriptors, file_size)176            with self.assertRaises(kas.FileFormatError):177                kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)178    def test_bad_key_initial_offsets(self):179        items = {"a": np.arange(100)}180        # First key offset must be at header_size + n * (descriptor_size)181        for offset in [-1, +1, 2, 100]:182            # First key offset must be at header_size + n * (descriptor_size)183            descriptors, file_size = store.pack_items(items)184            descriptors[0].key_start += offset185            with open(self.temp_file, "wb") as f:186                store.write_file(f, descriptors, file_size)187            self.assertRaises(188                kas.FileFormatError,189                kas.load,190                self.temp_file,191                engine=self.engine,192                read_all=self.read_all,193            )194    def test_bad_key_non_sequential(self):195        items = {"a": np.arange(100), "b": []}196        # Keys must be packed sequentially.197        for offset in [-1, +1, 2, 100]:198            descriptors, file_size = store.pack_items(items)199            descriptors[1].key_start += offset200            with open(self.temp_file, "wb") as f:201                store.write_file(f, descriptors, file_size)202            self.assertRaises(203                kas.FileFormatError,204                kas.load,205                self.temp_file,206                engine=self.engine,207                read_all=self.read_all,208            )209    def test_bad_array_initial_offset(self):210        items = {"a": np.arange(100)}211        for offset in [-100, -1, +1, 2, 8, 16, 100]:212            # First key offset must be at header_size + n * (descriptor_size)213            descriptors, file_size = store.pack_items(items)214            descriptors[0].array_start += offset215            with open(self.temp_file, "wb") as f:216                store.write_file(f, descriptors, file_size)217            self.assertRaises(218                kas.FileFormatError,219                kas.load,220                self.temp_file,221                engine=self.engine,222                read_all=self.read_all,223            )224    def test_bad_array_non_sequential(self):225        items = {"a": np.arange(100), "b": []}226        for offset in [-1, 1, 2, -8, 8, 100]:227            descriptors, file_size = store.pack_items(items)228            descriptors[1].array_start += offset229            with open(self.temp_file, "wb") as f:230                store.write_file(f, descriptors, file_size)231            self.assertRaises(232                kas.FileFormatError,233                kas.load,234                self.temp_file,235                engine=self.engine,236                read_all=self.read_all,237            )238    def test_bad_array_alignment(self):239        items = {"a": np.arange(100, dtype=np.int8), "b": []}240        descriptors, file_size = store.pack_items(items)241        descriptors[0].array_start += 1242        descriptors[0].array_len -= 1243        with open(self.temp_file, "wb") as f:244            store.write_file(f, descriptors, file_size)245        with self.assertRaises(kas.FileFormatError):246            kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)247    def test_bad_array_packing(self):248        items = {"a": np.arange(100, dtype=np.int8), "b": []}249        descriptors, file_size = store.pack_items(items)250        descriptors[0].array_start += 8251        descriptors[0].array_len -= 8252        with open(self.temp_file, "wb") as f:253            store.write_file(f, descriptors, file_size)254        with self.assertRaises(kas.FileFormatError):255            kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)256class TestMalformedFilesPyEngine(MalformedFilesMixin, unittest.TestCase):257    read_all = False258    engine = kas.PY_ENGINE259class TestMalformedFilesCEngine(MalformedFilesMixin, unittest.TestCase):260    read_all = False261    engine = kas.C_ENGINE262class TestMalformedFilesPyEngineReadAll(MalformedFilesMixin, unittest.TestCase):263    read_all = True264    engine = kas.PY_ENGINE265class TestMalformedFilesCEngineReadAll(MalformedFilesMixin, unittest.TestCase):266    read_all = True267    engine = kas.C_ENGINE268class FileVersionsMixin(FileFormatsMixin):269    """270    Tests for the file major version.271    """272    def verify_major_version(self, version):273        self.write_file()274        with open(self.temp_file, "rb") as f:275            buff = bytearray(f.read())276        before_len = len(buff)277        buff[8:10] = struct.pack("<H", version)278        self.assertEqual(len(buff), before_len)279        with open(self.temp_file, "wb") as f:280            f.write(buff)281        kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)282    def test_major_version_too_old(self):283        self.assertRaises(284            kas.VersionTooOldError, self.verify_major_version, store.VERSION_MAJOR - 1285        )286    def test_major_version_too_new(self):287        for j in range(1, 5):288            self.assertRaises(289                kas.VersionTooNewError,290                self.verify_major_version,291                store.VERSION_MAJOR + j,292            )293class TestFileVersionsPyEngine(FileVersionsMixin, unittest.TestCase):294    engine = kas.PY_ENGINE295    read_all = False296class TestFileVersionsPyEngineReadAll(FileVersionsMixin, unittest.TestCase):297    engine = kas.PY_ENGINE298    read_all = True299class TestFileVersionsCEngine(FileVersionsMixin, unittest.TestCase):300    engine = kas.C_ENGINE301    read_all = False302class TestFileVersionsCEngineReadAll(FileVersionsMixin, unittest.TestCase):303    engine = kas.C_ENGINE...gather_test.py
Source:gather_test.py  
1import logging2import pytest3import msrc.appconfig.read_all4from common import AllTypes, WithDefaults5from common import all_json, all_types_instance, all_values, all_args6from common import all_env, mk_class7# Normal usage8# ============9def test_builtin_defaults():10    "No data required when all values have defaults."11    config, _ = msrc.appconfig.read_all.gather(WithDefaults)12    assert (999,) == config13    with pytest.raises(RuntimeError):14        msrc.appconfig.read_all.gather(AllTypes)15def test_override_defaults():16    "All values can come from `override_defaults`."17    loaded, _ = msrc.appconfig.read_all.gather(18        AllTypes, override_defaults=all_values)19    assert loaded == all_types_instance20def test_override_builtin_defaults():21    "`override_defaults` override built-in defaults."22    config, _ = msrc.appconfig.read_all.gather(23        WithDefaults,24        override_defaults=dict(a=1))25    assert (1,) == config26def test_config_file(tmp_path):27    "All values come from a configuration file in `config_files`."28    (tmp_path/"conf.json").write_text(all_json)29    path = (tmp_path/"conf.json").as_posix()30    config, _ = msrc.appconfig.read_all.gather(AllTypes, config_files=[path])31    assert all_types_instance == config32def test_config_file_argv(tmp_path):33    "All values come from a configuration file in `argv`."34    (tmp_path/"conf.json").write_text(all_json)35    path = (tmp_path/"conf.json").as_posix()36    config, _ = msrc.appconfig.read_all.gather(AllTypes, argv=["-c", path])37    assert all_types_instance == config38def test_config_file_sysargv(tmp_path, mocker):39    "All values come from a configuration file in `sys.argv`."40    (tmp_path/"conf.json").write_text(all_json)41    path = (tmp_path/"conf.json").as_posix()42    mocker.patch("sys.argv", new=["script", "-c", path])43    config, _ = msrc.appconfig.read_all.gather(AllTypes)44    assert all_types_instance == config45def test_config_file_combines_argv(mocker):46    "Option in `argv` adds to paths in `config_files`."47    from_file = mocker.patch("msrc.appconfig.read_all._from_file")48    config, _ = msrc.appconfig.read_all.gather(49        WithDefaults,50        config_files=["a"],51        argv=["-c", "b"])52    assert from_file.call_count == 253    # assert call_arg_list54def test_argv_overrides_sysargv(tmp_path, mocker):55    "Option in `argv` prevents looking at `sys.argv`."56    (tmp_path/"conf.json").write_text(all_json)57    path = (tmp_path/"conf.json").as_posix()58    mocker.patch("sys.argv", new=["script", "-c", "nofile.yml"])59    config, _ = msrc.appconfig.read_all.gather(AllTypes, argv=["-c", path])60    assert all_types_instance == config61def test_config_file_overrides_builtin(tmp_path):62    "Values from `config_files` override built-in defaults."63    (tmp_path/"conf.yaml").write_text("a: 2")64    path = (tmp_path/"conf.yaml").as_posix()65    config, _ = msrc.appconfig.read_all.gather(66        WithDefaults, config_files=[path])67    assert (2,) == config68def test_config_file_overrides_defaults(tmp_path):69    "Values from `config_files` override built-in defaults."70    (tmp_path/"conf.yaml").write_text("a: 2")71    path = (tmp_path/"conf.yaml").as_posix()72    config, _ = msrc.appconfig.read_all.gather(73        WithDefaults,74        override_defaults=dict(a=1),75        config_files=[path])76    assert (2,) == config77def test_env(mocker):78    "All values come from shell environment."79    mocker.patch("os.environ", new=all_env)80    config, _ = msrc.appconfig.read_all.gather(AllTypes, env_var_prefix="PRE_")81    assert all_types_instance == config82def test_env_overrides_config_file(tmp_path, mocker):83    "Values from env override config files."84    (tmp_path/"conf.yaml").write_text("a: 2")85    path = (tmp_path/"conf.yaml").as_posix()86    mocker.patch("sys.argv", new=["script", "-c", path])87    mocker.patch("os.environ", new=dict(_a="3"))88    config, _ = msrc.appconfig.read_all.gather(89        WithDefaults,90        override_defaults=dict(a=1),91        env_var_prefix="_")92    assert (3,) == config93def test_argv():94    "All values come from argv."95    config, _ = msrc.appconfig.read_all.gather(AllTypes, argv=all_args)96    assert all_types_instance == config97def test_arg_overrides_env(tmp_path, mocker):98    "Values from env override config files."99    (tmp_path/"conf.yaml").write_text("a: 2")100    path = (tmp_path/"conf.yaml").as_posix()101    mocker.patch("sys.argv", new=["script", "-c", path, "--a", "4"])102    mocker.patch("os.environ", new=dict(_a="3"))103    config, _ = msrc.appconfig.read_all.gather(104        WithDefaults,105        override_defaults=dict(a=1),106        env_var_prefix="_")107    assert (4,) == config108# Exceptions109# ==========110def test_gather_invalid_defaults():111    "Data in `override_defailts` must be parseable."112    with pytest.raises(ValueError):113        msrc.appconfig.read_all.gather(114            AllTypes,115            override_defaults=dict(nested=0))116def test_config_file_not_found():117    "Config file must exist."118    with pytest.raises(FileNotFoundError):119        msrc.appconfig.read_all.gather(AllTypes, config_files=["nofile.yml"])120# Help121# =====122def test_help(capsys, mocker):123    mocker.patch("sys.argv", new=["script", "-h", "--a", "4"])124    with pytest.raises(SystemExit):125        msrc.appconfig.read_all.gather(AllTypes, arg_aliases=dict(o="option"))126    captured = capsys.readouterr()127    assert captured.err == ''128    expected = [129        "--string STR",130        "--integer INT",131        "--fractional FLOAT",132        "--boolean [BOOL]",133        "-o En, --option En",134        "--strings STR STR",135        "--integers INT INT",136        "--fractionals FLOAT FLOAT",137        "--nested.booleans BOOL BOOL  (*)",138        "--nested.options En En"139    ]140    actual = captured.out.strip('\r\n').splitlines()141    assert actual[-len(expected):] == expected142def test_help_arg(capsys, mocker):143    mocker.patch("sys.argv", new=["script", "-h", "nested.booleans"])144    with pytest.raises(SystemExit):145        msrc.appconfig.read_all.gather(AllTypes)146    captured = capsys.readouterr()147    assert captured.err == ''148    actual = captured.out.strip('\r\n').splitlines()149    assert actual[1] == AllTypes.__annotations__[150        "nested"]._field_help["booleans"]151def test_nohelp_arg(capsys, mocker):152    mocker.patch("sys.argv", new=["script", "-h", "string"])153    with pytest.raises(SystemExit):154        msrc.appconfig.read_all.gather(AllTypes)155    captured = capsys.readouterr()156    assert captured.err == ''157    actual = captured.out.strip('\r\n').splitlines()158    assert actual[0] == "--string STR"159    assert actual[1].startswith('(')160def test_help_noarg(capsys, mocker):161    mocker.patch("sys.argv", new=["script", "-h", "nested.b"])162    with pytest.raises(SystemExit):163        msrc.appconfig.read_all.gather(AllTypes)164    captured = capsys.readouterr()165    assert captured.err == ''166    actual = captured.out.strip('\r\n').splitlines()167    assert actual[0].index("nested.b") > 0168def test_no_help(mocker):169    schema = mk_class(("help", str), ("hero", bool))170    mocker.patch("sys.argv", new=["script", "-h", "--help", "test"])171    actual, _ = msrc.appconfig.read_all.gather(172        schema, arg_aliases=dict(h="hero"))173    assert actual.hero174    assert actual.help == "test"175# Logging176# ========177def test_log_default(mocker):178    """By default logging set to INFO using basicConfig."""179    mock = mocker.patch("logging.basicConfig")180    mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",181                 return_value=False)182    c, _ = msrc.appconfig.read_all.gather(183        mk_class(("a", int)), argv=["--a", "34"])184    mock.assert_called_once_with(level=logging.INFO)185    assert c.a == 34186def test_log_preconfigured(mocker):187    """If logging is set up just set level for the package logger."""188    mock = mocker.patch("msrc.appconfig.read_all.logger.setLevel")189    mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",190                 return_value=True)191    c, _ = msrc.appconfig.read_all.gather(192        mk_class(("a", int)), argv=["--a", "34"])193    mock.assert_called_once_with(logging.INFO)194    assert c.a == 34195def test_log_override_log_level(mocker):196    """By default logging set to INFO using basicConfig."""197    mock = mocker.patch("logging.basicConfig")198    mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",199                 return_value=False)200    c, _ = msrc.appconfig.read_all.gather(201        mk_class(("a", int)), argv=["--a", "34"], log_level=logging.WARN)202    mock.assert_called_once_with(level=logging.WARN)203    assert c.a == 34204def test_log_override_argv(mocker):205    """By default logging set to INFO using basicConfig."""206    mock = mocker.patch("logging.basicConfig")207    mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",208                 return_value=False)209    c, _ = msrc.appconfig.read_all.gather(210        mk_class(("a", int)), argv=["--a", "34", "-l", "warn"])211    mock.assert_called_once_with(level=logging.WARN)212    assert c.a == 34213def test_log_file_config(tmp_path, mocker):214    (tmp_path/"conf.json").write_text(all_json)215    path = (tmp_path/"conf.json").as_posix()216    mock = mocker.patch("logging.config.fileConfig")217    c, _ = msrc.appconfig.read_all.gather(218        mk_class(("a", int)),219        argv=["--a", "37", "-l", path])220    mock.assert_called_once_with(path)221    assert c.a == 37222def test_log_invalid(mocker):223    with pytest.raises(ValueError):224        msrc.appconfig.read_all.gather(225            mk_class(("a", int)),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
