How to use read_all method in Robotframework

Best Python code snippet using robotframework

models.py

Source:models.py Github

copy

Full Screen

...30 except Exception as e: print(e)31 finally:32 connection.close()33 return result34 def read_all(self):35 try:36 result = None37 connection = pyodbc.connect(connString)38 cursor = connection.cursor()39 cursor.execute(self.queries['read_all'])40 result = cursor.fetchall()41 if not result: raise Exception('(dao_network.read_all) >>> No se encontraron valores.')42 except Exception as e: print(e)43 finally:44 connection.close()45 return result46 def read(self, network):47 try:48 result = None49 connection = pyodbc.connect(connString)50 cursor = connection.cursor()51 cursor.execute(self.queries['read_by_name'], network)52 result = cursor.fetchone()53 if not result: raise Exception('(dao_network.read_by_name) >>> No se encontraron valores.')54 except Exception as e: print(e)55 finally:56 connection.close()57 return result58 def insert(self, network):59 try:60 message = None61 connection = pyodbc.connect(connString)62 cursor = connection.cursor()63 # Chequear que la network no exista aún64 cursor.execute(self.queries['read_by_name'], network)65 exists = cursor.fetchone()66 if not exists:67 cursor.execute(self.queries['insert'], network)68 message = '(dao_network.insert) >>> network \"{0}\" insertada.'.format(network)69 else:70 message = '(dao_network.insert) >>> network \"{0}\" ya existe'.format(network)71 except pyodbc.DatabaseError:72 connection.rollback()73 else:74 connection.commit()75 finally: 76 connection.autocommit = True77 connection.close()78 return message79class ContentType:80 queries = {81 'read_all' : 'SELECT * FROM ContentType',82 'read_by_name' : 'SELECT * FROM ContentType WHERE content_type = ?',83 'insert' : 'INSERT INTO ContentType VALUES(?)',84 'get_id' : 'SELECT c.id FROM ContentType c WHERE c.content_type = ? '85 }86 def get_id(self, content_type):87 try:88 result = None89 connection = pyodbc.connect(connString)90 cursor = connection.cursor()91 cursor.execute(self.queries['get_id'], content_type)92 result = cursor.fetchone()93 if not result: raise Exception('(dao_contentType.get_id) >>> No se encontraron valores.')94 except Exception as e: print(e)95 finally:96 connection.close()97 return result98 def read_all(self):99 try:100 result = None101 connection = pyodbc.connect(connString)102 cursor = connection.cursor()103 cursor.execute(self.queries['read_all'])104 result = cursor.fetchall()105 if not result: raise Exception('(dao_contentType.read_all) >>> No se encontraron valores.')106 except Exception as e: print(e)107 finally:108 connection.close()109 return result110 111 def read(self, content_type):112 try:113 result = None114 connection = pyodbc.connect(connString)115 cursor = connection.cursor()116 117 cursor.execute(self.queries['read_by_name'], content_type)118 result = cursor.fetchone()119 if not result: raise Exception('(dao_contentType.read_by_name) >>> No se encontraron valores')120 except Exception as e: print(e)121 finally:122 connection.close()123 return result124 def insert(self, content_type):125 try:126 message = None127 connection = pyodbc.connect(connString)128 cursor = connection.cursor()129 # Chequear que no exista aún130 cursor.execute(self.queries['read_by_name'], content_type)131 exists = cursor.fetchone()132 if not exists:133 cursor.execute(self.queries['insert'], content_type)134 message = '(dao_contentType.insert) >>> content_type \"{0}\" insertado.'.format(content_type)135 else:136 message = '(dao_contentType.insert) >>> content_type \"{0}\" ya existe.'.format(content_type)137 except pyodbc.DatabaseError:138 connection.rollback()139 else:140 connection.commit()141 finally:142 connection.autocommit = True143 connection.close()144 return message145class Hashtag:146 queries = {147 'read_all' : 'SELECT * FROM Hashtag',148 'read_by_name' : 'SELECT * FROM Hashtag WHERE hashtag = ?',149 'insert' : 'INSERT INTO Hashtag VALUES(?)'150 }151 def read_all(self):152 try:153 result = None154 connection = pyodbc.connect(connString)155 cursor = connection.cursor()156 cursor.execute(self.queries['read_all'])157 result = cursor.fetchall()158 if not result: raise Exception('(dao_hashtag.read_all) >>> No se encontraron valores.')159 except Exception as e: print(e)160 finally:161 connection.close()162 return result163 164 def read(self, hashtag):165 try:166 result = None167 connection = pyodbc.connect(connString)168 cursor = connection.cursor()169 170 cursor.execute(self.queries['read_by_name'], hashtag)171 result = cursor.fetchone()172 if not result: raise Exception('(dao_hashtag.read_by_name) >>> No se encontraron valores')173 except Exception as e: print(e)174 finally:175 connection.close()176 return result177 def insert(self, hashtag):178 try:179 message = None180 connection = pyodbc.connect(connString)181 cursor = connection.cursor()182 # Chequear que no exista aún183 cursor.execute(self.queries['read_by_name'], hashtag)184 exists = cursor.fetchone()185 if not exists:186 cursor.execute(self.queries['insert'], hashtag)187 message = '(dao_hashtag.insert) >>> hashtag \"{0}\" insertado.'.format(hashtag)188 else:189 message = '(dao_hashtag.insert) >>> hashtag \"{0}\" ya existe.'.format(hashtag)190 except pyodbc.DatabaseError:191 connection.rollback()192 else:193 connection.commit()194 finally:195 connection.autocommit = True196 connection.close()197 return message198class Entity:199 queries = {200 'read_all' : 'SELECT * FROM Entity',201 'read_by_name' : 'SELECT * FROM Entity WHERE ent_name = ?',202 'insert' : 'INSERT INTO Entity VALUES(?)'203 }204 def read_all(self):205 try:206 result = None207 connection = pyodbc.connect(connString)208 cursor = connection.cursor()209 cursor.execute(self.queries['read_all'])210 result = cursor.fetchall()211 if not result: raise Exception('(dao_entity.read_all) >>> No se encontraron valores.')212 except Exception as e: print(e)213 finally:214 connection.close()215 return result216 217 def read(self, entity):218 try:219 result = None220 connection = pyodbc.connect(connString)221 cursor = connection.cursor()222 223 cursor.execute(self.queries['read_by_name'], entity)224 result = cursor.fetchone()225 if not result: raise Exception('(dao_entity.read_by_name) >>> No se encontraron valores')226 except Exception as e: print(e)227 finally:228 connection.close()229 return result230 def insert(self, entity):231 try:232 message = None233 connection = pyodbc.connect(connString)234 cursor = connection.cursor()235 # Chequear que no exista aún236 cursor.execute(self.queries['read_by_name'], entity)237 exists = cursor.fetchone()238 if not exists:239 cursor.execute(self.queries['insert'], entity)240 message = '(dao_entity.insert) >>> entidad \"{0}\" insertada.'.format(entity)241 else:242 message = '(dao_entity.insert) >>> entidad \"{0}\" ya existe.'.format(entity)243 except pyodbc.DatabaseError:244 connection.rollback()245 else:246 connection.commit()247 finally:248 connection.autocommit = True249 connection.close()250 return message251class Rating:252 queries = {253 'read_all' : 'SELECT * FROM Rating',254 'read_by_date' : 'SELECT * FROM Rating r WHERE r.rating_date = ?',255 'insert' : 'INSERT INTO Rating VALUES(?,?)'256 }257 def read_all(self):258 try:259 result = None260 connection = pyodbc.connect(connString)261 cursor = connection.cursor()262 cursor.execute(self.queries['read_all'])263 result = cursor.fetchall()264 if not result: raise Exception('(dao_rating.read_all) >>> No se encontraron valores.')265 result = [(x[0], str(x[1]), x[2]) for x in result]266 except Exception as e: print(e)267 finally:268 connection.close()269 return result270 271 def read(self, rating_date):272 try:273 result = None274 connection = pyodbc.connect(connString)275 cursor = connection.cursor()276 277 cursor.execute(self.queries['read_by_date'], rating_date)278 result = cursor.fetchone()279 if not result: raise Exception('(dao_rating.read_by_date) >>> No se encontraron valores')280 result = (result[0], str(result[1]), result[2])281 except Exception as e: print(e)282 finally:283 connection.close()284 return result285 def insert(self, rating_date, points):286 try:287 message = None288 connection = pyodbc.connect(connString)289 cursor = connection.cursor()290 # Chequear que no exista aún291 cursor.execute(self.queries['read_by_date'], rating_date)292 exists = cursor.fetchone()293 if not exists:294 cursor.execute(self.queries['insert'], rating_date, points)295 message = '(dao_rating.insert) >>> rating \"{0} ~ {1}\" insertado.'.format(rating_date, points)296 else:297 message = '(dao_rating.insert) >>> rating \"{0} ~ {1}\" ya existe.'.format(rating_date, points)298 except pyodbc.DatabaseError:299 connection.rollback()300 else:301 connection.commit()302 finally:303 connection.autocommit = True304 connection.close()305 return message306class Sentiment:307 queries = {308 'read_all' : 'SELECT * FROM Sentiment',309 'read_by_name' : 'SELECT * FROM Sentiment s WHERE sentiment = ?',310 'insert' : 'INSERT INTO Sentiment VALUES(?)',311 'get_id' : 'SELECT s.id FROM Sentiment s WHERE s.sentiment = ?'312 }313 def get_id(self, sentiment):314 try:315 result = None316 connection = pyodbc.connect(connString)317 cursor = connection.cursor()318 cursor.execute(self.queries['get_id'], sentiment)319 result = cursor.fetchone()320 if not result: raise Exception('(dao_sentiment.get_id) >>> No se encontraron valores.')321 except Exception as e: print(e)322 finally:323 connection.close()324 return result325 def read_all(self):326 try:327 result = None328 connection = pyodbc.connect(connString)329 cursor = connection.cursor()330 cursor.execute(self.queries['read_all'])331 result = cursor.fetchall()332 if not result: raise Exception('(dao_sentiment.read_all) >>> No se encontraron valores.')333 except Exception as e: print(e)334 finally:335 connection.close()336 return result337 338 def read(self, sentiment):339 try:340 result = None341 connection = pyodbc.connect(connString)342 cursor = connection.cursor()343 344 cursor.execute(self.queries['read_by_name'], sentiment)345 result = cursor.fetchone()346 if not result: raise Exception('(dao_sentiment.read_by_name) >>> No se encontraron valores')347 except Exception as e: print(e)348 finally:349 connection.close()350 return result351 def insert(self, sentiment):352 try:353 message = None354 connection = pyodbc.connect(connString)355 cursor = connection.cursor()356 # Chequear que no exista aún357 cursor.execute(self.queries['read_by_name'], sentiment)358 exists = cursor.fetchone()359 if not exists:360 cursor.execute(self.queries['insert'], sentiment)361 message = '(dao_sentiment.insert) >>> sentimento \"{0}\" insertado.'.format(sentiment)362 else:363 message = '(dao_sentiment.insert) >>> sentimento \"{0}\" ya existe.'.format(sentiment)364 except pyodbc.DatabaseError:365 connection.rollback()366 else:367 connection.commit()368 finally:369 connection.autocommit = True370 connection.close()371 return message372class UserProfile:373 queries = {374 'read_all' : 'SELECT * FROM UserProfile',375 'read_by_name' : 'SELECT * FROM UserProfile WHERE profile_name = ?',376 'insert' : 'INSERT INTO UserProfile VALUES(?,?)',377 'get_id' : 'SELECT u.id FROM UserProfile u WHERE u.profile_name = ? '378 }379 def get_id(self, usuario):380 try:381 result = None382 connection = pyodbc.connect(connString)383 cursor = connection.cursor()384 cursor.execute(self.queries['get_id'], usuario)385 result = cursor.fetchone()386 if not result: raise Exception('(dao_user_profile.get_id) >>> No se encontraron valores.')387 except Exception as e: print(e)388 finally:389 connection.close()390 return result391 def read_all(self):392 try:393 result = None394 connection = pyodbc.connect(connString)395 cursor = connection.cursor()396 cursor.execute(self.queries['read_all'])397 result = cursor.fetchall()398 if not result: raise Exception('(dao_userProfile.read_all) >>> No se encontraron valores.')399 except Exception as e: print(e)400 finally:401 connection.close()402 return result403 404 def read(self, profile_name):405 try:406 result = None407 connection = pyodbc.connect(connString)408 cursor = connection.cursor()409 410 cursor.execute(self.queries['read_by_name'], profile_name)411 result = cursor.fetchone()412 if not result: raise Exception('(dao_userProfile.read_by_name) >>> No se encontraron valores')413 except Exception as e: print(e)414 finally:415 connection.close()416 return result417 def insert(self, profile_name, followers):418 try:419 message = None420 connection = pyodbc.connect(connString)421 cursor = connection.cursor()422 # Chequear que no exista aún423 cursor.execute(self.queries['read_by_name'], profile_name)424 exists = cursor.fetchone()425 if not exists:426 cursor.execute(self.queries['insert'], profile_name, followers)427 message = '(dao_userProfile.insert) >>> usuario \"{0} ~ {1}\" insertado.'.format(profile_name, followers)428 else:429 message = '(dao_userProfile.insert) >>> usuario \"{0} ~ {1}\" ya existe.'.format(profile_name, followers)430 except pyodbc.DatabaseError:431 connection.rollback()432 else:433 connection.commit()434 finally:435 connection.autocommit = True436 connection.close()437 return message438class Post:439 queries = {440 'read_all' : 'SELECT * FROM Post',441 'read_by_id_message' : 'SELECT * FROM Post WHERE id_message = ?',442 'insert' : 'INSERT INTO Post VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'443 }444 def read_all(self):445 try:446 result = None447 connection = pyodbc.connect(connString)448 cursor = connection.cursor()449 cursor.execute(self.queries['read_all'])450 result = cursor.fetchall()451 if not result: raise Exception('(dao_userProfile.read_all) >>> No se encontraron valores.')452 except Exception as e: print(e)453 finally:454 connection.close()455 return result456 457 def read(self, id_message):458 try:...

Full Screen

Full Screen

test_fileobj.py

Source:test_fileobj.py Github

copy

Full Screen

1"""2Tests for load()ing and dump()ing with file-like objects.3"""4import multiprocessing5import os6import pathlib7import platform8import queue9import shutil10import socket11import socketserver12import tempfile13import traceback14import unittest15import numpy as np16import kastore as kas17IS_WINDOWS = platform.system() == "Windows"18class DictVerifyMixin:19 def verify_dicts_equal(self, d1, d2):20 self.assertEqual(sorted(d1.keys()), sorted(d2.keys()))21 for key in d1.keys():22 np.testing.assert_equal(d1[key], d2[key])23# pathlib.Path objects should work transparently, and these tests check it24# isn't broken by fileobj-enabling code.25class PathlibMixin(DictVerifyMixin):26 def setUp(self):27 fd, path = tempfile.mkstemp(prefix="kas_test_pathlib")28 os.close(fd)29 self.temp_file = pathlib.Path(path)30 def tearDown(self):31 self.temp_file.unlink()32 def test_dump_to_pathlib_Path(self):33 data = {"a": np.arange(10)}34 kas.dump(data, self.temp_file, engine=self.engine)35 def test_load_from_pathlib_Path(self):36 data = {"a": np.arange(10)}37 kas.dump(data, str(self.temp_file), engine=self.engine)38 file_size = self.temp_file.stat().st_size39 for read_all in [True, False]:40 data_out = kas.load(self.temp_file, read_all=read_all, engine=self.engine)41 data2 = dict(data_out.items())42 file_size2 = self.temp_file.stat().st_size43 self.verify_dicts_equal(data, data2)44 self.assertEqual(file_size, file_size2)45class FileobjMixin(DictVerifyMixin):46 def setUp(self):47 fd, path = tempfile.mkstemp(prefix="kas_test_fileobj")48 os.close(fd)49 self.temp_file = path50 def tearDown(self):51 os.unlink(self.temp_file)52 def test_dump_fileobj_single(self):53 data = {"a": np.arange(10)}54 with open(self.temp_file, "wb") as f:55 kas.dump(data, f, engine=self.engine)56 data_out = kas.load(self.temp_file, engine=self.engine)57 data2 = dict(data_out.items())58 self.verify_dicts_equal(data, data2)59 def test_dump_fileobj_multi(self):60 with open(self.temp_file, "wb") as f:61 for i in range(10):62 data = {63 "i" + str(i): np.arange(i, dtype=int),64 "f" + str(i): np.arange(i, dtype=float),65 }66 kas.dump(data, f, engine=self.engine)67 def test_load_fileobj_single(self):68 data = {"a": np.arange(10)}69 kas.dump(data, self.temp_file, engine=self.engine)70 file_size = os.stat(self.temp_file).st_size71 for read_all in [True, False]:72 with open(self.temp_file, "rb") as f:73 data_out = kas.load(f, read_all=read_all, engine=self.engine)74 data2 = dict(data_out.items())75 file_offset = f.tell()76 self.verify_dicts_equal(data, data2)77 self.assertEqual(file_offset, file_size)78 def test_load_and_dump_fileobj_single(self):79 data = {"a": np.arange(10)}80 with open(self.temp_file, "wb") as f:81 kas.dump(data, f, engine=self.engine)82 file_size = os.stat(self.temp_file).st_size83 for read_all in [True, False]:84 with open(self.temp_file, "rb") as f:85 data_out = kas.load(f, read_all=read_all, engine=self.engine)86 data2 = dict(data_out.items())87 file_offset = f.tell()88 self.verify_dicts_equal(data, data2)89 self.assertEqual(file_offset, file_size)90 def test_load_and_dump_fileobj_multi(self):91 datalist = [92 {93 "i" + str(i): i + np.arange(10 ** 5, dtype=int),94 "f" + str(i): i + np.arange(10 ** 5, dtype=float),95 }96 for i in range(10)97 ]98 file_offsets = []99 with open(self.temp_file, "wb") as f:100 for data in datalist:101 kas.dump(data, f, engine=self.engine)102 file_offsets.append(f.tell())103 for read_all in [True, False]:104 with open(self.temp_file, "rb") as f:105 for data, file_offset in zip(datalist, file_offsets):106 data_out = kas.load(f, read_all=read_all, engine=self.engine)107 data2 = dict(data_out.items())108 file_offset2 = f.tell()109 self.verify_dicts_equal(data, data2)110 self.assertEqual(file_offset, file_offset2)111 def test_load_and_dump_file_single_rw(self):112 data = {"a": np.arange(10)}113 with open(self.temp_file, "r+b") as f:114 kas.dump(data, f, engine=self.engine)115 for read_all in [True, False]:116 f.seek(0)117 data_out = kas.load(f, read_all=read_all, engine=self.engine)118 data2 = dict(data_out.items())119 self.verify_dicts_equal(data, data2)120 def test_load_and_dump_file_multi_rw(self):121 datalist = [122 {123 "i" + str(i): i + np.arange(10 ** 5, dtype=int),124 "f" + str(i): i + np.arange(10 ** 5, dtype=float),125 }126 for i in range(10)127 ]128 with open(self.temp_file, "r+b") as f:129 for data in datalist:130 kas.dump(data, f, engine=self.engine)131 for read_all in [True, False]:132 f.seek(0)133 for data in datalist:134 data_out = kas.load(f, read_all=read_all, engine=self.engine)135 data2 = dict(data_out.items())136 self.verify_dicts_equal(data, data2)137 def test_load_and_dump_fd_single_rw(self):138 data = {"a": np.arange(10)}139 with open(self.temp_file, "r+b") as f:140 fd = f.fileno()141 kas.dump(data, fd, engine=self.engine)142 for read_all in [True, False]:143 os.lseek(fd, 0, os.SEEK_SET)144 data_out = kas.load(fd, read_all=read_all, engine=self.engine)145 data2 = dict(data_out.items())146 self.verify_dicts_equal(data, data2)147 def test_load_and_dump_fd_multi_rw(self):148 datalist = [149 {150 "i" + str(i): i + np.arange(10 ** 5, dtype=int),151 "f" + str(i): i + np.arange(10 ** 5, dtype=float),152 }153 for i in range(20)154 ]155 with open(self.temp_file, "r+b") as f:156 fd = f.fileno()157 for data in datalist:158 kas.dump(data, fd, engine=self.engine)159 for read_all in [True, False]:160 os.lseek(fd, 0, os.SEEK_SET)161 for data in datalist:162 data_out = kas.load(fd, read_all=read_all, engine=self.engine)163 data2 = dict(data_out.items())164 self.verify_dicts_equal(data, data2)165def dump_to_stream(q_err, q_in, file_out, engine):166 """167 Get data dicts from `q_in` and kas.dump() them to `file_out`.168 Uncaught exceptions are placed onto the `q_err` queue.169 """170 try:171 with open(file_out, "wb") as f:172 while True:173 data = q_in.get()174 if data is None:175 break176 kas.dump(data, f, engine=engine)177 except Exception as exc:178 tb = traceback.format_exc()179 q_err.put((exc, tb))180def load_from_stream(q_err, q_out, file_in, engine, read_all):181 """182 kas.load() stores from `file_in` and put their data onto `q_out`.183 Uncaught exceptions are placed onto the `q_err` queue.184 """185 try:186 with open(file_in, "rb") as f:187 while True:188 try:189 data = kas.load(f, read_all=read_all, engine=engine)190 except EOFError:191 break192 q_out.put(dict(data.items()))193 except Exception as exc:194 tb = traceback.format_exc()195 q_err.put((exc, tb))196class StreamingMixin(DictVerifyMixin):197 def setUp(self):198 self.temp_dir = tempfile.mkdtemp(prefix="kas_test_streaming")199 self.temp_fifo = os.path.join(self.temp_dir, "fifo")200 os.mkfifo(self.temp_fifo)201 def tearDown(self):202 shutil.rmtree(self.temp_dir)203 def stream(self, datalist, read_all=True):204 """205 data -> q_in -> kas.dump(..., fifo) -> kas.load(fifo) -> q_out -> data_out206 """207 q_err = multiprocessing.Queue()208 q_in = multiprocessing.Queue()209 q_out = multiprocessing.Queue()210 proc1 = multiprocessing.Process(211 target=dump_to_stream, args=(q_err, q_in, self.temp_fifo, self.engine)212 )213 proc2 = multiprocessing.Process(214 target=load_from_stream,215 args=(q_err, q_out, self.temp_fifo, self.engine, read_all),216 )217 proc1.start()218 proc2.start()219 for data in datalist:220 q_in.put(data)221 q_in.put(None) # signal the process that we're done222 proc1.join(timeout=3)223 if not q_err.empty():224 # re-raise the first child exception225 exc, tb = q_err.get()226 print(tb)227 raise exc228 if proc1.is_alive():229 # prevent hang if proc1 failed to join230 proc1.terminate()231 proc2.terminate()232 self.assertTrue(False, msg="proc1 (kas.dump) failed to join")233 datalist_out = []234 for _ in datalist:235 try:236 data_out = q_out.get(timeout=3)237 except queue.Empty:238 # terminate proc2 so we don't hang239 proc2.terminate()240 raise241 datalist_out.append(data_out)242 proc2.join(timeout=3)243 if proc2.is_alive():244 # prevent hang if proc2 failed to join245 proc2.terminate()246 self.assertTrue(False, msg="proc2 (kas.load) failed to join")247 self.assertEqual(len(datalist), len(datalist_out))248 for data, data_out in zip(datalist, datalist_out):249 self.verify_dicts_equal(data, data_out)250 def test_stream_single(self):251 datalist = [{"a": np.array([0])}]252 self.stream(datalist)253 def test_stream_multi(self):254 datalist = [255 {256 "i" + str(i): i + np.arange(10 ** 5, dtype=int),257 "f" + str(i): i + np.arange(10 ** 5, dtype=float),258 }259 for i in range(100)260 ]261 self.stream(datalist)262ADDRESS = ("localhost", 10009)263class TestServer(socketserver.ThreadingTCPServer):264 allow_reuse_address = True265class StoreEchoHandler(socketserver.BaseRequestHandler):266 def handle(self):267 while True:268 try:269 data = kas.load(270 self.request.fileno(), engine=self.engine, read_all=True271 )272 except EOFError:273 break274 kas.dump(dict(data), self.request.fileno(), engine=self.engine)275 # We only read one list, so shutdown the server straight away276 self.server.shutdown()277class StoreEchoHandlerCEngine(StoreEchoHandler):278 engine = kas.C_ENGINE279class StoreEchoHandlerPyEngine(StoreEchoHandler):280 engine = kas.PY_ENGINE281def server_process(engine, q):282 handlers = {283 kas.C_ENGINE: StoreEchoHandlerCEngine,284 kas.PY_ENGINE: StoreEchoHandlerPyEngine,285 }286 server = TestServer(ADDRESS, handlers[engine])287 # Tell the client (on the other end of the queue) that it's OK to open288 # a connection289 q.put(None)290 server.serve_forever()291class SocketMixin(DictVerifyMixin):292 def setUp(self):293 # Use a queue to synchronise the startup of the server and the client.294 q = multiprocessing.Queue()295 self.server_process = multiprocessing.Process(296 target=server_process, args=(self.engine, q)297 )298 self.server_process.start()299 q.get()300 self.client = socket.create_connection(ADDRESS)301 def tearDown(self):302 self.client.close()303 self.server_process.join()304 def verify_stream(self, data_list):305 fd = self.client.fileno()306 for data in data_list:307 kas.dump(data, fd, engine=self.engine)308 echo_data = kas.load(fd, read_all=True, engine=self.engine)309 self.verify_dicts_equal(data, echo_data)310 def test_single(self):311 self.verify_stream([{"a": np.arange(10)}])312 def test_two(self):313 self.verify_stream([{"a": np.zeros(10)}, {"b": np.zeros(100)}])314 def test_multi(self):315 datalist = [316 {317 "i" + str(i): i + np.arange(10 ** 5, dtype=int),318 "f" + str(i): i + np.arange(10 ** 5, dtype=float),319 }320 for i in range(10)321 ]322 self.verify_stream(datalist)323class TestPathlibCEngine(PathlibMixin, unittest.TestCase):324 engine = kas.C_ENGINE325class TestPathlibPyEngine(PathlibMixin, unittest.TestCase):326 engine = kas.PY_ENGINE327class TestFileobjCEngine(FileobjMixin, unittest.TestCase):328 engine = kas.C_ENGINE329class TestFileobjPyEngine(FileobjMixin, unittest.TestCase):330 engine = kas.PY_ENGINE331@unittest.skipIf(IS_WINDOWS, "FIFOs don't exist on Windows")332class TestStreamingCEngine(StreamingMixin, unittest.TestCase):333 engine = kas.C_ENGINE334@unittest.skipIf(IS_WINDOWS, "FIFOs don't exist on Windows")335class TestStreamingPyEngine(StreamingMixin, unittest.TestCase):336 engine = kas.PY_ENGINE337@unittest.skipIf(IS_WINDOWS, "Deadlocking on Windows")338class TestSocketCEngine(SocketMixin, unittest.TestCase):339 engine = kas.C_ENGINE340@unittest.skipIf(IS_WINDOWS, "Deadlocking on Windows")341class TestSocketPyEngine(SocketMixin, unittest.TestCase):...

Full Screen

Full Screen

test_errors.py

Source:test_errors.py Github

copy

Full Screen

1"""2Tests for error conditions.3"""4import os5import platform6import struct7import tempfile8import unittest9import numpy as np10import kastore as kas11import kastore.store as store12IS_WINDOWS = platform.system() == "Windows"13class InterfaceMixin:14 """15 Exercise the low-level interface.16 """17 def setUp(self):18 fd, path = tempfile.mkstemp(prefix="kas_test_errors")19 os.close(fd)20 self.temp_file = path21 def tearDown(self):22 os.unlink(self.temp_file)23 def test_bad_dicts(self):24 for bad_dict in [[], "w34", None, 1]:25 self.assertRaises(26 TypeError, kas.dump, bad_dict, self.temp_file, engine=self.engine27 )28 self.assertRaises(29 TypeError, kas.dump, bad_dict, self.temp_file, engine=self.engine30 )31 def test_bad_filename_type(self):32 for bad_filename in [[], None, {}]:33 self.assertRaises(TypeError, kas.dump, {}, bad_filename, engine=self.engine)34 self.assertRaises(TypeError, kas.dump, {}, bad_filename, engine=self.engine)35 self.assertRaises(TypeError, kas.load, bad_filename, engine=self.engine)36 self.assertRaises(TypeError, kas.load, bad_filename, engine=self.engine)37 def test_bad_keys(self):38 a = np.zeros(1)39 for bad_key in [(1234,), b"1234", None, 1234]:40 with self.assertRaises(TypeError):41 kas.dump({bad_key: a}, self.temp_file, engine=self.engine)42 def test_bad_arrays(self):43 kas.dump({"a": []}, self.temp_file, engine=self.engine)44 for bad_array in [kas, lambda x: x, "1234", None, [[0, 1], [0, 2]]]:45 self.assertRaises(46 ValueError,47 kas.dump,48 {"a": bad_array},49 self.temp_file,50 engine=self.engine,51 )52 # TODO add tests for arrays in fortran order and so on.53 def test_file_not_found(self):54 a = np.zeros(1)55 for bad_file in ["no_such_file", "/no/such/file"]:56 self.assertRaises(FileNotFoundError, kas.load, bad_file, engine=self.engine)57 self.assertRaises(58 FileNotFoundError, kas.dump, {"a": a}, "/no/such/file", engine=self.engine59 )60 def test_file_is_a_directory(self):61 tmp_dir = tempfile.mkdtemp()62 try:63 exception = IsADirectoryError64 if IS_WINDOWS:65 exception = PermissionError66 self.assertRaises(67 exception, kas.dump, {"a": []}, tmp_dir, engine=self.engine68 )69 self.assertRaises(exception, kas.load, tmp_dir, engine=self.engine)70 finally:71 os.rmdir(tmp_dir)72class TestInterfacePyEngine(InterfaceMixin, unittest.TestCase):73 engine = kas.PY_ENGINE74class TestInterfaceCEngine(InterfaceMixin, unittest.TestCase):75 engine = kas.C_ENGINE76class TestEngines(unittest.TestCase):77 """78 Check that we correctly identify bad engines79 """80 bad_engines = [None, {}, "no such engine", b"not an engine"]81 def test_bad_engine_dump(self):82 for bad_engine in self.bad_engines:83 self.assertRaises(ValueError, kas.dump, {}, "", engine=bad_engine)84 def test_bad_engine_load(self):85 for bad_engine in self.bad_engines:86 self.assertRaises(ValueError, kas.load, "", engine=bad_engine)87class FileFormatsMixin:88 """89 Common utilities for tests on the file format.90 """91 def setUp(self):92 fd, path = tempfile.mkstemp(prefix="kas_malformed_files")93 os.close(fd)94 self.temp_file = path95 def tearDown(self):96 os.unlink(self.temp_file)97 def write_file(self, num_items=0):98 data = {}99 for j in range(num_items):100 data["a" * (j + 1)] = np.arange(j)101 kas.dump(data, self.temp_file)102class MalformedFilesMixin(FileFormatsMixin):103 """104 Tests for various types of malformed intput files.105 """106 def test_empty_file(self):107 with open(self.temp_file, "w"):108 pass109 self.assertEqual(os.path.getsize(self.temp_file), 0)110 self.assertRaises(111 EOFError,112 kas.load,113 self.temp_file,114 engine=self.engine,115 read_all=self.read_all,116 )117 def test_bad_magic(self):118 self.write_file()119 with open(self.temp_file, "rb") as f:120 buff = bytearray(f.read())121 before_len = len(buff)122 buff[0:8] = b"12345678"123 self.assertEqual(len(buff), before_len)124 with open(self.temp_file, "wb") as f:125 f.write(buff)126 self.assertRaises(127 kas.FileFormatError,128 kas.load,129 self.temp_file,130 engine=self.engine,131 read_all=self.read_all,132 )133 def test_bad_file_size(self):134 for num_items in range(10):135 for offset in [-2, -1, 1, 2 ** 10]:136 self.write_file(num_items)137 file_size = os.path.getsize(self.temp_file)138 with open(self.temp_file, "rb") as f:139 buff = bytearray(f.read())140 before_len = len(buff)141 buff[16:24] = struct.pack("<Q", file_size + offset)142 self.assertEqual(len(buff), before_len)143 with open(self.temp_file, "wb") as f:144 f.write(buff)145 with self.assertRaises(kas.FileFormatError):146 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)147 def test_truncated_file_descriptors(self):148 for num_items in range(2, 5):149 self.write_file(num_items)150 with open(self.temp_file, "rb") as f:151 buff = bytearray(f.read())152 with open(self.temp_file, "wb") as f:153 f.write(buff[: num_items * store.ITEM_DESCRIPTOR_SIZE - 1])154 with self.assertRaises(kas.FileFormatError):155 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)156 def test_truncated_file_data(self):157 for num_items in range(2, 5):158 self.write_file(num_items)159 with open(self.temp_file, "rb") as f:160 buff = bytearray(f.read())161 with open(self.temp_file, "wb") as f:162 f.write(buff[:-1])163 with self.assertRaises(kas.FileFormatError):164 # Must call dict to ensure all the keys are loaded.165 dict(166 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)167 )168 def test_bad_item_types(self):169 items = {"a": []}170 descriptors, file_size = store.pack_items(items)171 num_types = len(store.np_dtype_to_type_map)172 for bad_type in [num_types + 1, 2 * num_types]:173 with open(self.temp_file, "wb") as f:174 descriptors[0].type = bad_type175 store.write_file(f, descriptors, file_size)176 with self.assertRaises(kas.FileFormatError):177 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)178 def test_bad_key_initial_offsets(self):179 items = {"a": np.arange(100)}180 # First key offset must be at header_size + n * (descriptor_size)181 for offset in [-1, +1, 2, 100]:182 # First key offset must be at header_size + n * (descriptor_size)183 descriptors, file_size = store.pack_items(items)184 descriptors[0].key_start += offset185 with open(self.temp_file, "wb") as f:186 store.write_file(f, descriptors, file_size)187 self.assertRaises(188 kas.FileFormatError,189 kas.load,190 self.temp_file,191 engine=self.engine,192 read_all=self.read_all,193 )194 def test_bad_key_non_sequential(self):195 items = {"a": np.arange(100), "b": []}196 # Keys must be packed sequentially.197 for offset in [-1, +1, 2, 100]:198 descriptors, file_size = store.pack_items(items)199 descriptors[1].key_start += offset200 with open(self.temp_file, "wb") as f:201 store.write_file(f, descriptors, file_size)202 self.assertRaises(203 kas.FileFormatError,204 kas.load,205 self.temp_file,206 engine=self.engine,207 read_all=self.read_all,208 )209 def test_bad_array_initial_offset(self):210 items = {"a": np.arange(100)}211 for offset in [-100, -1, +1, 2, 8, 16, 100]:212 # First key offset must be at header_size + n * (descriptor_size)213 descriptors, file_size = store.pack_items(items)214 descriptors[0].array_start += offset215 with open(self.temp_file, "wb") as f:216 store.write_file(f, descriptors, file_size)217 self.assertRaises(218 kas.FileFormatError,219 kas.load,220 self.temp_file,221 engine=self.engine,222 read_all=self.read_all,223 )224 def test_bad_array_non_sequential(self):225 items = {"a": np.arange(100), "b": []}226 for offset in [-1, 1, 2, -8, 8, 100]:227 descriptors, file_size = store.pack_items(items)228 descriptors[1].array_start += offset229 with open(self.temp_file, "wb") as f:230 store.write_file(f, descriptors, file_size)231 self.assertRaises(232 kas.FileFormatError,233 kas.load,234 self.temp_file,235 engine=self.engine,236 read_all=self.read_all,237 )238 def test_bad_array_alignment(self):239 items = {"a": np.arange(100, dtype=np.int8), "b": []}240 descriptors, file_size = store.pack_items(items)241 descriptors[0].array_start += 1242 descriptors[0].array_len -= 1243 with open(self.temp_file, "wb") as f:244 store.write_file(f, descriptors, file_size)245 with self.assertRaises(kas.FileFormatError):246 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)247 def test_bad_array_packing(self):248 items = {"a": np.arange(100, dtype=np.int8), "b": []}249 descriptors, file_size = store.pack_items(items)250 descriptors[0].array_start += 8251 descriptors[0].array_len -= 8252 with open(self.temp_file, "wb") as f:253 store.write_file(f, descriptors, file_size)254 with self.assertRaises(kas.FileFormatError):255 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)256class TestMalformedFilesPyEngine(MalformedFilesMixin, unittest.TestCase):257 read_all = False258 engine = kas.PY_ENGINE259class TestMalformedFilesCEngine(MalformedFilesMixin, unittest.TestCase):260 read_all = False261 engine = kas.C_ENGINE262class TestMalformedFilesPyEngineReadAll(MalformedFilesMixin, unittest.TestCase):263 read_all = True264 engine = kas.PY_ENGINE265class TestMalformedFilesCEngineReadAll(MalformedFilesMixin, unittest.TestCase):266 read_all = True267 engine = kas.C_ENGINE268class FileVersionsMixin(FileFormatsMixin):269 """270 Tests for the file major version.271 """272 def verify_major_version(self, version):273 self.write_file()274 with open(self.temp_file, "rb") as f:275 buff = bytearray(f.read())276 before_len = len(buff)277 buff[8:10] = struct.pack("<H", version)278 self.assertEqual(len(buff), before_len)279 with open(self.temp_file, "wb") as f:280 f.write(buff)281 kas.load(self.temp_file, engine=self.engine, read_all=self.read_all)282 def test_major_version_too_old(self):283 self.assertRaises(284 kas.VersionTooOldError, self.verify_major_version, store.VERSION_MAJOR - 1285 )286 def test_major_version_too_new(self):287 for j in range(1, 5):288 self.assertRaises(289 kas.VersionTooNewError,290 self.verify_major_version,291 store.VERSION_MAJOR + j,292 )293class TestFileVersionsPyEngine(FileVersionsMixin, unittest.TestCase):294 engine = kas.PY_ENGINE295 read_all = False296class TestFileVersionsPyEngineReadAll(FileVersionsMixin, unittest.TestCase):297 engine = kas.PY_ENGINE298 read_all = True299class TestFileVersionsCEngine(FileVersionsMixin, unittest.TestCase):300 engine = kas.C_ENGINE301 read_all = False302class TestFileVersionsCEngineReadAll(FileVersionsMixin, unittest.TestCase):303 engine = kas.C_ENGINE...

Full Screen

Full Screen

gather_test.py

Source:gather_test.py Github

copy

Full Screen

1import logging2import pytest3import msrc.appconfig.read_all4from common import AllTypes, WithDefaults5from common import all_json, all_types_instance, all_values, all_args6from common import all_env, mk_class7# Normal usage8# ============9def test_builtin_defaults():10 "No data required when all values have defaults."11 config, _ = msrc.appconfig.read_all.gather(WithDefaults)12 assert (999,) == config13 with pytest.raises(RuntimeError):14 msrc.appconfig.read_all.gather(AllTypes)15def test_override_defaults():16 "All values can come from `override_defaults`."17 loaded, _ = msrc.appconfig.read_all.gather(18 AllTypes, override_defaults=all_values)19 assert loaded == all_types_instance20def test_override_builtin_defaults():21 "`override_defaults` override built-in defaults."22 config, _ = msrc.appconfig.read_all.gather(23 WithDefaults,24 override_defaults=dict(a=1))25 assert (1,) == config26def test_config_file(tmp_path):27 "All values come from a configuration file in `config_files`."28 (tmp_path/"conf.json").write_text(all_json)29 path = (tmp_path/"conf.json").as_posix()30 config, _ = msrc.appconfig.read_all.gather(AllTypes, config_files=[path])31 assert all_types_instance == config32def test_config_file_argv(tmp_path):33 "All values come from a configuration file in `argv`."34 (tmp_path/"conf.json").write_text(all_json)35 path = (tmp_path/"conf.json").as_posix()36 config, _ = msrc.appconfig.read_all.gather(AllTypes, argv=["-c", path])37 assert all_types_instance == config38def test_config_file_sysargv(tmp_path, mocker):39 "All values come from a configuration file in `sys.argv`."40 (tmp_path/"conf.json").write_text(all_json)41 path = (tmp_path/"conf.json").as_posix()42 mocker.patch("sys.argv", new=["script", "-c", path])43 config, _ = msrc.appconfig.read_all.gather(AllTypes)44 assert all_types_instance == config45def test_config_file_combines_argv(mocker):46 "Option in `argv` adds to paths in `config_files`."47 from_file = mocker.patch("msrc.appconfig.read_all._from_file")48 config, _ = msrc.appconfig.read_all.gather(49 WithDefaults,50 config_files=["a"],51 argv=["-c", "b"])52 assert from_file.call_count == 253 # assert call_arg_list54def test_argv_overrides_sysargv(tmp_path, mocker):55 "Option in `argv` prevents looking at `sys.argv`."56 (tmp_path/"conf.json").write_text(all_json)57 path = (tmp_path/"conf.json").as_posix()58 mocker.patch("sys.argv", new=["script", "-c", "nofile.yml"])59 config, _ = msrc.appconfig.read_all.gather(AllTypes, argv=["-c", path])60 assert all_types_instance == config61def test_config_file_overrides_builtin(tmp_path):62 "Values from `config_files` override built-in defaults."63 (tmp_path/"conf.yaml").write_text("a: 2")64 path = (tmp_path/"conf.yaml").as_posix()65 config, _ = msrc.appconfig.read_all.gather(66 WithDefaults, config_files=[path])67 assert (2,) == config68def test_config_file_overrides_defaults(tmp_path):69 "Values from `config_files` override built-in defaults."70 (tmp_path/"conf.yaml").write_text("a: 2")71 path = (tmp_path/"conf.yaml").as_posix()72 config, _ = msrc.appconfig.read_all.gather(73 WithDefaults,74 override_defaults=dict(a=1),75 config_files=[path])76 assert (2,) == config77def test_env(mocker):78 "All values come from shell environment."79 mocker.patch("os.environ", new=all_env)80 config, _ = msrc.appconfig.read_all.gather(AllTypes, env_var_prefix="PRE_")81 assert all_types_instance == config82def test_env_overrides_config_file(tmp_path, mocker):83 "Values from env override config files."84 (tmp_path/"conf.yaml").write_text("a: 2")85 path = (tmp_path/"conf.yaml").as_posix()86 mocker.patch("sys.argv", new=["script", "-c", path])87 mocker.patch("os.environ", new=dict(_a="3"))88 config, _ = msrc.appconfig.read_all.gather(89 WithDefaults,90 override_defaults=dict(a=1),91 env_var_prefix="_")92 assert (3,) == config93def test_argv():94 "All values come from argv."95 config, _ = msrc.appconfig.read_all.gather(AllTypes, argv=all_args)96 assert all_types_instance == config97def test_arg_overrides_env(tmp_path, mocker):98 "Values from env override config files."99 (tmp_path/"conf.yaml").write_text("a: 2")100 path = (tmp_path/"conf.yaml").as_posix()101 mocker.patch("sys.argv", new=["script", "-c", path, "--a", "4"])102 mocker.patch("os.environ", new=dict(_a="3"))103 config, _ = msrc.appconfig.read_all.gather(104 WithDefaults,105 override_defaults=dict(a=1),106 env_var_prefix="_")107 assert (4,) == config108# Exceptions109# ==========110def test_gather_invalid_defaults():111 "Data in `override_defailts` must be parseable."112 with pytest.raises(ValueError):113 msrc.appconfig.read_all.gather(114 AllTypes,115 override_defaults=dict(nested=0))116def test_config_file_not_found():117 "Config file must exist."118 with pytest.raises(FileNotFoundError):119 msrc.appconfig.read_all.gather(AllTypes, config_files=["nofile.yml"])120# Help121# =====122def test_help(capsys, mocker):123 mocker.patch("sys.argv", new=["script", "-h", "--a", "4"])124 with pytest.raises(SystemExit):125 msrc.appconfig.read_all.gather(AllTypes, arg_aliases=dict(o="option"))126 captured = capsys.readouterr()127 assert captured.err == ''128 expected = [129 "--string STR",130 "--integer INT",131 "--fractional FLOAT",132 "--boolean [BOOL]",133 "-o En, --option En",134 "--strings STR STR",135 "--integers INT INT",136 "--fractionals FLOAT FLOAT",137 "--nested.booleans BOOL BOOL (*)",138 "--nested.options En En"139 ]140 actual = captured.out.strip('\r\n').splitlines()141 assert actual[-len(expected):] == expected142def test_help_arg(capsys, mocker):143 mocker.patch("sys.argv", new=["script", "-h", "nested.booleans"])144 with pytest.raises(SystemExit):145 msrc.appconfig.read_all.gather(AllTypes)146 captured = capsys.readouterr()147 assert captured.err == ''148 actual = captured.out.strip('\r\n').splitlines()149 assert actual[1] == AllTypes.__annotations__[150 "nested"]._field_help["booleans"]151def test_nohelp_arg(capsys, mocker):152 mocker.patch("sys.argv", new=["script", "-h", "string"])153 with pytest.raises(SystemExit):154 msrc.appconfig.read_all.gather(AllTypes)155 captured = capsys.readouterr()156 assert captured.err == ''157 actual = captured.out.strip('\r\n').splitlines()158 assert actual[0] == "--string STR"159 assert actual[1].startswith('(')160def test_help_noarg(capsys, mocker):161 mocker.patch("sys.argv", new=["script", "-h", "nested.b"])162 with pytest.raises(SystemExit):163 msrc.appconfig.read_all.gather(AllTypes)164 captured = capsys.readouterr()165 assert captured.err == ''166 actual = captured.out.strip('\r\n').splitlines()167 assert actual[0].index("nested.b") > 0168def test_no_help(mocker):169 schema = mk_class(("help", str), ("hero", bool))170 mocker.patch("sys.argv", new=["script", "-h", "--help", "test"])171 actual, _ = msrc.appconfig.read_all.gather(172 schema, arg_aliases=dict(h="hero"))173 assert actual.hero174 assert actual.help == "test"175# Logging176# ========177def test_log_default(mocker):178 """By default logging set to INFO using basicConfig."""179 mock = mocker.patch("logging.basicConfig")180 mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",181 return_value=False)182 c, _ = msrc.appconfig.read_all.gather(183 mk_class(("a", int)), argv=["--a", "34"])184 mock.assert_called_once_with(level=logging.INFO)185 assert c.a == 34186def test_log_preconfigured(mocker):187 """If logging is set up just set level for the package logger."""188 mock = mocker.patch("msrc.appconfig.read_all.logger.setLevel")189 mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",190 return_value=True)191 c, _ = msrc.appconfig.read_all.gather(192 mk_class(("a", int)), argv=["--a", "34"])193 mock.assert_called_once_with(logging.INFO)194 assert c.a == 34195def test_log_override_log_level(mocker):196 """By default logging set to INFO using basicConfig."""197 mock = mocker.patch("logging.basicConfig")198 mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",199 return_value=False)200 c, _ = msrc.appconfig.read_all.gather(201 mk_class(("a", int)), argv=["--a", "34"], log_level=logging.WARN)202 mock.assert_called_once_with(level=logging.WARN)203 assert c.a == 34204def test_log_override_argv(mocker):205 """By default logging set to INFO using basicConfig."""206 mock = mocker.patch("logging.basicConfig")207 mocker.patch("msrc.appconfig.read_all.logger.hasHandlers",208 return_value=False)209 c, _ = msrc.appconfig.read_all.gather(210 mk_class(("a", int)), argv=["--a", "34", "-l", "warn"])211 mock.assert_called_once_with(level=logging.WARN)212 assert c.a == 34213def test_log_file_config(tmp_path, mocker):214 (tmp_path/"conf.json").write_text(all_json)215 path = (tmp_path/"conf.json").as_posix()216 mock = mocker.patch("logging.config.fileConfig")217 c, _ = msrc.appconfig.read_all.gather(218 mk_class(("a", int)),219 argv=["--a", "37", "-l", path])220 mock.assert_called_once_with(path)221 assert c.a == 37222def test_log_invalid(mocker):223 with pytest.raises(ValueError):224 msrc.appconfig.read_all.gather(225 mk_class(("a", int)),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful