Best Python code snippet using Airtest
test_storage.py
Source:test_storage.py  
1'''2Created on Sep 20, 20143@author: ivan4'''5import unittest6import tempfile7import os8from xapi_back.storage import Storage9import shutil10import time11import uuid12TEST_FILE= '/usr/bin/python'13def gen_uuid():14    return str(uuid.uuid4())15def write_sample_file( s):16            w = s.get_writer()17            with open(TEST_FILE,'rb') as f:18                while True:19                    read= f.read(10000)20                    if not read:21                        break22                    w.write(read)23            s.close()24class Test(unittest.TestCase):25    def setUp(self):26        self.dir=tempfile.mkdtemp()27    28    def test_uuid(self):29        sr=Storage(self.dir, 2,compression_level= 0)30        uid=gen_uuid()31        rack=sr.get_rack_for('test',uid )32        slot=rack.create_slot()33        write_sample_file(slot)34        slot.close()35        uid2=gen_uuid()36        rack=sr.get_rack_for('test',uid2 )37        self.assertEqual(rack.last_slot, None)38        self.assertTrue(os.path.exists(rack._path))39        uid3=gen_uuid()40        rack=sr.get_rack_for('test',uid3,exists=True)41        self.assertFalse(rack)42        43        try:44            r=sr.find_rack_for('test')45            self.fail('Should raise exception')46        except Storage.NotFound:47            pass48        49        try:50            r=sr.find_rack_for('test', '123')51            self.fail('Should raise exception')52        except Storage.NotFound:53            pass54        55        r=sr.find_rack_for('test', uid)56        self.assertTrue(r)57        s=r.last_slot58        self.assertTrue(s)59        60        r=sr.find_rack_for('test', uid[:5].upper())61        self.assertTrue(r)62        s=r.last_slot63        self.assertTrue(s)64        65        r=sr.find_rack_for('test', uid2)66        self.assertTrue(r)67        s=r.last_slot68        self.assertFalse(s)69        70        71        72    def test_compress(self):73        test_file_size=os.stat(TEST_FILE).st_size74        sr=Storage(self.dir, 2,compression_level= 0)75        uid=gen_uuid()76        rack=sr.get_rack_for('test',uid )77        slot=rack.create_slot()78        write_sample_file(slot)79        res= rack.last_slot80        self.assertEqual(res._comp_method, 'client')81        self.assertEqual(test_file_size, res.size_uncompressed)82        self.assertTrue(test_file_size < res.size)83        84        sr=Storage(self.dir, 2,compression_level= 1)85        rack=sr.get_rack_for('test', uid)86        slot=rack.create_slot()87        write_sample_file(slot)88        res= rack.last_slot89        self.assertEqual(test_file_size, res.size_uncompressed)90        self.assertTrue(test_file_size > res.size)91        comp_size = res.size92        93        sr=Storage(self.dir, 2,compression_level= 9)94        rack=sr.get_rack_for('test', uid)95        slot=rack.create_slot()96        write_sample_file(slot)97        res= rack.last_slot98        self.assertEqual(test_file_size, res.size_uncompressed)99        self.assertTrue(comp_size > res.size)100        101    def test_no_compress(self):102        test_file_size=os.stat(TEST_FILE).st_size103        sr=Storage(self.dir, 2,compression_method= None)104        uid=gen_uuid()105        rack=sr.get_rack_for('test', uid)106        slot=rack.create_slot()107        write_sample_file(slot)108        res= rack.last_slot109        self.assertEqual(res._comp_method, None)110        self.assertEqual(test_file_size, res.size_uncompressed)111        self.assertEqual(test_file_size , res.size)112        113        self.assertTrue(slot._path.endswith('.xva'))114        self.assertTrue(res._path.endswith('.xva'))115        self.assertTrue(res.duration)116        print 'duration',res.duration117        self.assertTrue(res.created)118        print 'created', res.created119        120        st=sr.get_status()121        self.assertTrue(st[uid])122        123    def test_no_compress_server(self):124        test_file_size=os.stat(TEST_FILE).st_size                125        sr=Storage(self.dir, 2,compression_method= 'server')126        uid=gen_uuid()127        rack=sr.get_rack_for('test',uid)128        slot=rack.create_slot()129        write_sample_file(slot)130        res= rack.last_slot131        self.assertEqual(res._comp_method, 'server')132        self.assertEqual(test_file_size, res.size_uncompressed)133        self.assertEqual(test_file_size , res.size)134        135        self.assertTrue(slot._path.endswith('.xva.gz'))136        self.assertTrue(res._path.endswith('.xva.gz'))137        138    def test_basic(self):139        sr=Storage(self.dir, 2)140        uid=gen_uuid()141        rack=sr.get_rack_for('test', uid)142        slot=rack.create_slot()143        msg='Hi There Is Anybody In There'144        slot.get_writer().write(msg)145        time.sleep(0.001)146        slot.close()147        self.assertEqual(1, len(rack.get_all_slots()))148        m=rack.last_slot149        self.assertEqual(len(msg), m.size_uncompressed)150        self.assertTrue(m.duration>0)151        m.close()152        time.sleep(0.01)153        slot=rack.create_slot()154        slot.get_writer().write('Second')155        slot.close()156        self.assertEqual(2, len(rack.get_all_slots()))157        slot=rack.last_slot158        r=slot.get_reader().read()159        slot.close()160        self.assertEqual('Second',r)161        162        slot=rack.create_slot()163        w=slot.get_writer()164        w.write('aaa')165        w.close()166        self.assertEqual(2, len(rack.get_all_slots()))167        168        time.sleep(0.01)169        slot=rack.create_slot()170        slot.get_writer().write('Third')171        slot.close()172        self.assertEqual(3, len(rack.get_all_slots()))173        slot=rack.last_slot174        r=slot.get_reader().read()175        slot.close()176        self.assertEqual('Third',r)177        178        rack.shrink()179        self.assertEqual(2, len(rack.get_all_slots()))180        181        r=slot.get_reader().read()182        slot.close()183        self.assertEqual('Third',r)184    def tearDown(self):185       shutil.rmtree(self.dir)186if __name__ == "__main__":187    #import sys;sys.argv = ['', 'Test.testName']...test_cachefile.py
Source:test_cachefile.py  
1# -*- coding: utf-8 -*-2"""3/*4 * Copyright (c) 2019 EasyDo, Inc. <panjunyong@easydo.cn>5 *6 * This program is free software: you can use, redistribute, and/or modify7 * it under the terms of the GNU Affero General Public License, version 38 * or later ("AGPL"), as published by the Free Software Foundation.9 *10 * This program is distributed in the hope that it will be useful, but WITHOUT11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12 * FITNESS FOR A PARTICULAR PURPOSE.13 *14 * You should have received a copy of the GNU Affero General Public License15 * along with this program. If not, see <http://www.gnu.org/licenses/>.16 */17"""18from __future__ import division19import math20import os21import string22from random import choice, randint23from shutil import rmtree24from tempfile import mkdtemp25import pytest26from libs.easydo_fs.cachefile import BUFF_SIZE, CacheFile27TEST_FILE_SIZE = int(math.floor(1024 * 1024 * 9.5))28DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')29@pytest.fixture(scope='session', autouse=True)30def data_bin():31    fname = 'data.bin'32    fpath = os.path.join(DATA_DIR, fname)33    with open(fpath, 'w+b') as f:34        random_digits = [choice(string.digits) for _ in range(TEST_FILE_SIZE)]35        f.write(''.join(random_digits))36    with open(fpath, 'r+b') as f:37        yield f38@pytest.fixture()39def tempd():40    tempd = mkdtemp('.pytest-cachefile')41    assert os.path.isdir(tempd)42    yield tempd43    rmtree(tempd)44@pytest.fixture()45def cachefile(tempd, data_bin):46    def _read_origin(start, end):47        data_bin.seek(start)48        return data_bin.read(end - start)49    uid = 1234550    revision = 151    file_size = TEST_FILE_SIZE  # 9.5MB total size52    cache_dir = tempd53    yield CacheFile(uid, revision, file_size, cache_dir, _read_origin)54def test_read_1_byte(cachefile, data_bin):55    ret = cachefile.read(0, 1)56    assert len(ret) == 157    data_bin.seek(0)58    assert ret == data_bin.read(1)59def test_read_1_block(cachefile, data_bin):60    ret = cachefile.read(0, BUFF_SIZE)61    assert len(ret) == BUFF_SIZE62    data_bin.seek(0)63    assert ret == data_bin.read(BUFF_SIZE)64def test_read_1_block_plus_1_byte(cachefile, data_bin):65    ret = cachefile.read(0, BUFF_SIZE + 1)66    assert len(ret) == BUFF_SIZE + 167    data_bin.seek(0)68    assert ret == data_bin.read(BUFF_SIZE + 1)69def test_read_2_block(cachefile, data_bin):70    ret = cachefile.read(0, BUFF_SIZE * 2)71    assert len(ret) == BUFF_SIZE * 272    data_bin.seek(0)73    assert ret == data_bin.read(BUFF_SIZE * 2)74def test_read_3_block(cachefile, data_bin):75    ret = cachefile.read(0, BUFF_SIZE * 3)76    assert len(ret) == BUFF_SIZE * 377    data_bin.seek(0)78    assert ret == data_bin.read(BUFF_SIZE * 3)79def test_read_eof(cachefile, data_bin):80    ret = cachefile.read(TEST_FILE_SIZE, TEST_FILE_SIZE + 1)81    assert len(ret) == 082    data_bin.seek(TEST_FILE_SIZE)83    assert ret == data_bin.read(1)84def test_read_last_byte(cachefile, data_bin):85    ret = cachefile.read(TEST_FILE_SIZE - 1, TEST_FILE_SIZE)86    assert len(ret) == 187    data_bin.seek(TEST_FILE_SIZE - 1)88    assert ret == data_bin.read(1)89def test_read_over_filesize(cachefile, data_bin):90    ret = cachefile.read(TEST_FILE_SIZE - 1, TEST_FILE_SIZE + 1)91    assert len(ret) == 192    data_bin.seek(TEST_FILE_SIZE - 1)93    assert ret == data_bin.read(2)94def test_read_last_block_to_eof(cachefile, data_bin):95    offset = 1024 * 1024 * TEST_FILE_SIZE // (1024 * 1024)96    read_size = 1024 * 102497    ret = cachefile.read(offset, offset + read_size)98    assert len(ret) == TEST_FILE_SIZE - offset99    assert len(ret) <= read_size100    data_bin.seek(offset)101    assert ret == data_bin.read(read_size)102def test_read_all(cachefile, data_bin):103    ret = cachefile.read(0, TEST_FILE_SIZE)104    assert len(ret) == TEST_FILE_SIZE105    data_bin.seek(0)106    assert ret == data_bin.read()107def test_seek_and_read(cachefile, data_bin):108    start = 12345109    read_size = BUFF_SIZE110    ret = cachefile.read(start, start + read_size)111    assert len(ret) == read_size112    data_bin.seek(start)113    assert ret == data_bin.read(read_size)114@pytest.mark.repeat(100)115def test_random_seek_and_read(cachefile, data_bin):116    start = randint(0, TEST_FILE_SIZE)117    read_size = randint(1, TEST_FILE_SIZE - start)118    ret = cachefile.read(start, start + read_size)119    assert len(ret) == read_size120    data_bin.seek(start)...validators_test.py
Source:validators_test.py  
1import os.path as op2import pytest3from django.core.exceptions import ValidationError4from .validators import *5from .utils_test import TEST_FILE_SIZE6FILE_SIZE_SCENARIOS = {7    'allpass': {8        'min': None,9        'max': None,10        'pass': True,11    },12    'low_pass': {13        'min': TEST_FILE_SIZE,14        'max': None,15        'pass': True,16    },17    'low_reject': {18        'min': TEST_FILE_SIZE + 1,19        'max': None,20        'pass': False,21    },22    'high_pass': {23        'min': None,24        'max': TEST_FILE_SIZE,25        'pass': True,26    },27    'high_reject': {28        'min': None,29        'max': TEST_FILE_SIZE - 1,30        'pass': False,31    },32    'narrow_pass': {33        'min': TEST_FILE_SIZE,34        'max': TEST_FILE_SIZE,35        'pass': True,36    },37    'narrow_reject_low': {38        'min': TEST_FILE_SIZE + 1,39        'max': TEST_FILE_SIZE + 1,40        'pass': False,41    },42    'narrow_reject_high': {43        'min': TEST_FILE_SIZE - 1,44        'max': TEST_FILE_SIZE - 1,45        'pass': False,46    },47}48MEDIA_TYPE_SCENARIOS = {49    'accept_all': {50        'accept': ['*/*'],51        'reject': [],52        'pass': True,53    },54    'reject_all': {55        'accept': [],56        'reject': [],57        'pass': False,58    },59    'accept_audio': {60        'accept': ['audio/*', 'application/octet-stream'],61        'reject': [],62        'pass': True,63    },64    'reject_audio': {65        'accept': ['*/*'],66        'reject': ['audio/*', 'application/octet-stream'],67        'pass': False,68    },69    'accept_amr': {70        'accept': ['audio/AMR', 'application/octet-stream'],71        'reject': [],72        'pass': True,73    },74    'reject_amr': {75        'accept': ['audio/*'],76        'reject': ['audio/AMR', 'application/octet-stream'],77        'pass': False,78    },79    'mixed_accept_audio': {80        'accept': ['video/*', 'audio/*', 'application/octet-stream', 'text/plain', 'text/csv'],81        'reject': ['video/MPV', 'video/quicktime'],82        'pass': True,83    },84    'mixed_reject_audio': {85        'accept': ['video/*', 'text/plain', 'text/csv'],86        'reject': ['video/MPV', 'video/quicktime', 'audio/*'],87        'pass': False,88    },89    'mixed_accept_amr': {90        'accept': ['video/*', 'text/plain', 'text/csv', 'audio/AMR', 'application/octet-stream'],91        'reject': ['video/MPV', 'video/quicktime'],92        'pass': True,93    },94    'mixed_reject_amr': {95        'accept': ['video/*', 'audio/*', 'text/plain', 'text/csv'],96        'reject': ['video/MPV', 'audio/AMR', 'video/quicktime'],97        'pass': False,98    },99}100@pytest.fixture(101    params=FILE_SIZE_SCENARIOS.values(),102    ids=list(FILE_SIZE_SCENARIOS.keys()),103)104def file_size_fix(request):105    return request.param106def test_FileSizeValidator(file_size_fix, amr_file):107    validate = FileSizeValidator(file_size_fix['min'], file_size_fix['max'])108    if file_size_fix['pass']:109        assert validate(amr_file.name) is None110    else:111        with pytest.raises(ValidationError):112            validate(amr_file.name)113@pytest.fixture(114    params=MEDIA_TYPE_SCENARIOS.values(),115    ids=list(MEDIA_TYPE_SCENARIOS.keys()),116)117def media_type_fix(request):118    return request.param119def test_MediaTypeValidator(media_type_fix, amr_file):120    validate = MediaTypeValidator(121        media_type_fix['accept'],122        media_type_fix['reject'],123    )124    if media_type_fix['pass']:125        assert validate(amr_file.name) is None126    else:127        with pytest.raises(ValidationError):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
