Best Python code snippet using fMBT_python
test_receiver.py
Source:test_receiver.py  
...25    def finish_connect(self):26        protocol = ReceiverProtocol(self)27        protocol.makeConnection(self.transport)28        self.connected.callback(protocol)29    def write_response(self, msg):30        self.protocol.lineReceived(msg)31    def sent(self):32        data = self.transport.value()33        self.transport.clear()34        return data35def test_receiverbase_connect_then_write():36    class TestParent(MockedConnectionReceiver):37        test_connects_directly = False38    parent = TestParent("fakehost")39    parent.finish_connect()40    parent.write("Test")41    assert parent.sent() == b"Test\r"42def test_receiverbase_write_before_connect():43    class TestParent(MockedConnectionReceiver):44        test_connects_directly = False45    parent = TestParent("fakehost")46    parent.write("Test")47    assert parent.sent() == b""48    parent.finish_connect()49    assert parent.sent() == b"Test\r"50def test_receiverbase_multiple_writes_wait_for_each_response():51    class TestParent(MockedConnectionReceiver):52        test_connects_directly = False53        control = Control("PW")54    parent = TestParent("fakehost")55    parent.finish_connect()56    parent.write("Test")57    parent.write("Test2")58    parent.write("Test3")59    assert parent.sent() == b"Test\r"60    parent.write_response(b"Response")61    assert parent.sent() == b"Test2\r"62    parent.write_response(b"Response2")63    assert parent.sent() == b"Test3\r"64@pytest_twisted.inlineCallbacks65def test_receiverbase_multiple_writes_with_no_response():66    class TestParent(MockedConnectionReceiver):67        test_connects_directly = False68        control = Control("PW")69    parent = TestParent("fakehost", 0.1)70    parent.finish_connect()71    parent.write("Test")72    parent.write("Test2")73    parent.write("Test3")74    assert parent.sent() == b"Test\r"75    def check():76        assert parent.sent() == b"Test2\rTest3\r"77    yield reactor.callLater(1.0, check)78def test_receiverbase_multiple_writes_with_no_timeout():79    class TestParent(MockedConnectionReceiver):80        test_connects_directly = False81        control = Control("PW")82    parent = TestParent("fakehost", 0)83    parent.finish_connect()84    parent.write("Test")85    parent.write("Test2")86    parent.write("Test3")87    assert parent.sent() == b"Test\rTest2\rTest3\r"88@pytest_twisted.inlineCallbacks89def test_control__name_only():90    class TestParent(MockedConnectionReceiver):91        control = Control("PW")92    parent = TestParent("fakehost")93    d = parent.control94    assert parent.sent() == b"PW?\r"95    parent.write_response(b"PWOFF")96    d.addTimeout(1, reactor)97    value = yield d98    assert value == "OFF"99    parent.control = "ON"100    assert parent.sent() == b"PWON\r"101@pytest_twisted.inlineCallbacks102def test_control__override_status_command():103    class TestParent(MockedConnectionReceiver):104        control = Control("PW", status_command="PW ?")105    parent = TestParent("fakehost")106    d = parent.control107    assert parent.sent() == b"PW ?\r"108    parent.write_response(b"PWOFF")109    d.addTimeout(1, reactor)110    value = yield d111    assert value == "OFF"112    parent.control = "ON"113    assert parent.sent() == b"PWON\r"114@pytest_twisted.inlineCallbacks115def test_control__override_set_command():116    class TestParent(MockedConnectionReceiver):117        control = Control("PW", set_command="PWA")118    parent = TestParent("fakehost")119    d = parent.control120    assert parent.sent() == b"PW?\r"121    parent.write_response(b"PWOFF")122    d.addTimeout(1, reactor)123    value = yield d124    assert value == "OFF"125    parent.control = "ON"126    assert parent.sent() == b"PWAON\r"127@pytest_twisted.inlineCallbacks128def test_control__override_response_prefix():129    class TestParent(MockedConnectionReceiver):130        control = Control("PW", response_prefix="PWA")131    parent = TestParent("fakehost")132    d = parent.control133    assert parent.sent() == b"PW?\r"134    parent.write_response(b"PWAOFF")135    d.addTimeout(1, reactor)136    value = yield d137    assert value == "OFF"138    parent.control = "ON"139    assert parent.sent() == b"PWON\r"140@pytest_twisted.inlineCallbacks141def test_control__incorrect_response():142    class TestParent(MockedConnectionReceiver):143        control = Control("PW")144    parent = TestParent("fakehost")145    d = parent.control146    assert parent.sent() == b"PW?\r"147    parent.write_response(b"RWAUTO")148    d.addTimeout(1, reactor)149    with pytest.raises(TimeoutError):150        yield d151@pytest_twisted.inlineCallbacks152def test_control__ignore_other_responses():153    class TestParent(MockedConnectionReceiver):154        control = Control("PW")155    parent = TestParent("fakehost")156    d = parent.control157    assert parent.sent() == b"PW?\r"158    parent.write_response(b"CV20")159    parent.write_response(b"MV30")160    parent.write_response(b"MUON")161    parent.write_response(b"PWOFF")162    d.addTimeout(1, reactor)163    value = yield d164    assert value == "OFF"165@pytest_twisted.inlineCallbacks166def test_numericcontrol():167    class TestParent(MockedConnectionReceiver):168        control = NumericControl("MV")169    parent = TestParent("fakehost")170    d = parent.control171    assert parent.sent() == b"MV?\r"172    parent.write_response(b"MV50")173    d.addTimeout(1, reactor)174    value = yield d175    assert value == 50176    parent.write_response(b"MV70")177    d = parent.control178    assert not parent.sent()179    d.addTimeout(1, reactor)180    value = yield d181    assert value == 70182    parent.control = 4183    assert parent.sent() == b"MV04\r"184    parent.write_response(b"MV04")185    parent.control = 38186    assert parent.sent() == b"MV38\r"187@pytest_twisted.inlineCallbacks188def test_numericcontrol__invalid_response():189    class TestParent(MockedConnectionReceiver):190        control = NumericControl("MV")191    parent = TestParent("fakehost")192    d = parent.control193    assert parent.sent() == b"MV?\r"194    parent.write_response(b"MVO")195    parent.write_response(b"MVTRUE")196    parent.write_response(b"MVFALSE")197    parent.write_response(b"MV")198    parent.write_response(b"MVB30")199    d.addTimeout(1, reactor)200    with pytest.raises(TimeoutError):201        yield d202def test_numericcontrol__invalid_input():203    class TestParent(MockedConnectionReceiver):204        control = NumericControl("MV")205    parent = TestParent("fakehost")206    with pytest.raises(ValueError):207        parent.control = -1208    with pytest.raises(ValueError):209        parent.control = 100210@pytest_twisted.inlineCallbacks211def test_numericcontrol__more_digits():212    class TestParent(MockedConnectionReceiver):213        control = NumericControl("MV", digits=4)214    parent = TestParent("fakehost")215    parent.write_response(b"MV0050")216    value = yield parent.control.addTimeout(1, reactor)217    assert value == 50218    parent.write_response(b"MV0700")219    value = yield parent.control.addTimeout(1, reactor)220    assert value == 700221    parent.control = 4222    assert parent.sent() == b"MV0004\r"223    parent.write_response(b"MV0004")224    parent.control = 38225    assert parent.sent() == b"MV0038\r"226    parent.write_response(b"MV0038")227    parent.control = 210228    assert parent.sent() == b"MV0210\r"229    parent.write_response(b"MV0210")230    parent.control = 3233231    assert parent.sent() == b"MV3233\r"232    parent.write_response(b"MV3233")233def test_volumecontrol():234    class TestParent(MockedConnectionReceiver):235        control = VolumeControl("MV")236    parent = TestParent("fakehost")237    parent.control = "+"238    assert parent.sent() == b"MVUP\r"239    parent.write_response(b"MV31")240    parent.control = "-"241    assert parent.sent() == b"MVDOWN\r"242    parent.write_response(b"MV30")243@pytest_twisted.inlineCallbacks244def test_enumcontrol():245    class TestEnum(Enum):246        Auto = "AUTO"247        HDMI = "HDMI"248        Digital = "DIGITAL"249    class TestParent(MockedConnectionReceiver):250        control = EnumControl("SD", enum_type=TestEnum)251    parent = TestParent("fakehost")252    parent.write_response(b"SDAUTO")253    value = yield parent.control.addTimeout(1, reactor)254    assert value == TestEnum.Auto255    parent.write_response(b"SDHDMI")256    value = yield parent.control.addTimeout(1, reactor)257    assert value == TestEnum.HDMI258    parent.write_response(b"SDDIGITAL")259    value = yield parent.control.addTimeout(1, reactor)260    assert value == TestEnum.Digital261    parent.write_response(b"SDNO")262    with pytest.raises(TimeoutError):263        yield parent.control.addTimeout(1, reactor)264    assert parent.sent() == b"SD?\r"265    parent.write_response(b"SDNO")266    parent.control = TestEnum.Auto...views.py
Source:views.py  
...8    body = get_body(r)9    try:10        d = json.loads(body)11    except ValueError:12        write_response(w, '{"error": "incorrect json"}')13        return None14    return d15def not_found(r, w, params):16    write_response(w, '{"error": "404"}')17def register(r, w, params):18    d = get_json_or_error(r, w)19    if d is None:20        return21    if "username" not in d or "password" not in d:22        write_response(w, '{"error": "no username or password in body"}')23        return24    username = d["username"]25    password = d["password"]26    u = find_user(username)27    if u is not None:28        write_response(w, '{"error": "user already exists"}')29        return30    if not validate(username):31        write_response(w, '{"error": "invalid username or password"}')32        return33    create_user(username, password)34    write_response(w, '{"result": "ok"}')35def login(r, w, params):36    d = get_json_or_error(r, w)37    if d is None:38        return39    if "username" not in d or "password" not in d:40        write_response(w, '{"error": "no username or password in body"}')41        return42    username = d["username"]43    password = d["password"]44    u = find_user(username)45    if u is None:46        write_response(w, '{"error": "no such user"}')47        return48    if u.password != password:49        write_response(w, '{"error": "incorrect password"}')50        return51    mng = get_session_manager()52    session_name = mng.init_session()53    mng.update_session(session_name, username)54    set_cookies(w, { 'session': session_name })55    write_response(w, '{"result": "ok"}')56def list_kozinaks(r, w, params):57    os.chdir("/db/kozi")58    kozi = filter(os.path.isfile, os.listdir("/db/kozi"))59    kozi = [os.path.join("/db/kozi", f) for f in kozi]60    kozi.sort(key=lambda x: os.path.getmtime(x))61    kozi = [f[9:] for f in kozi][::-1]62    write_response(w, json.dumps({63        "result": kozi64    }))65def create_kozi(r, w, params):66    cookies = parse_cookies(r)67    if "session" not in cookies:68        write_response(w, '{"error": "auth is required"}')69        return70    mng = get_session_manager()71    username = mng.get_session(cookies["session"])72    if username is None:73        write_response(w, '{"error": "auth is required"}')74        return75    u = find_user(username)76    if u is None:77        write_response(w, '{"error": "auth is required"}')78        return79    d = get_json_or_error(r, w)80    if d is None:81        return82    if "name" not in d or "fortune" not in d or "checksum" not in d:83        write_response(w, '{"error": "no name or fortune or checksum in body"}')84        return85    name = d["name"]86    if not validate(name):87        write_response(w, '{"error": "invalid name"}')88        return89    k = find_kozinak("%s_%s" % (username, name))90    if k is not None:91        write_response(w, '{"error": "kozinak already exists"}')92        return93    fortune = d["fortune"]94    checksum = d["checksum"]95    pipe = d["pipe"] if "pipe" in d else None96    if create_kozinak_chk(name, fortune, checksum, username, pipe):97        write_response(w, '{"result": "ok"}')98    else:99        write_response(w, '{"error": "incorrect checksum"}')100def get_kozi(r, w, params):101    if len(params) != 1:102        write_response(w, '{"error": "no kozi in params"}')103        return104    kozi_name = params[0]105    k = find_kozinak(kozi_name)106    if k is None:107        write_response(w, '{"error": "no such kozinak"}')108        return109    if k.owner != "open":110        cookies = parse_cookies(r)111        if "session" not in cookies:112            write_response(w, '{"error": "auth is required"}')113            return114        mng = get_session_manager()115        username = mng.get_session(cookies["session"])116        if username is None:117            write_response(w, '{"error": "auth is required"}')118            return119        u = find_user(username)120        if u is None:121            write_response(w, '{"error": "auth is required"}')122            return123        if not chk_owner(username, k.owner):124            write_response(w, '{"error": "no access"}')125            return126    write_response(w, json.dumps({127        'result': json.loads(k.to_json())128    }))129def open_kozi(r, w, params):130    cookies = parse_cookies(r)131    if "session" not in cookies:132        write_response(w, '{"error": "auth is required"}')133        return134    mng = get_session_manager()135    username = mng.get_session(cookies["session"])136    if username is None:137        write_response(w, '{"error": "auth is required"}')138        return139    u = find_user(username)140    if u is None:141        write_response(w, '{"error": "auth is required"}')142        return143    d = get_json_or_error(r, w)144    if d is None:145        return146    if "kozi" not in d:147        write_response(w, '{"error": "no kozi in body"}')148        return149    kozi_name = d["kozi"]150    k = find_kozinak(kozi_name)151    if k is None:152        write_response(w, '{"error": "no such kozinak"}')153        return154    if not chk_owner(username, k.owner):155        write_response(w, '{"error": "no access"}')156        return157    k.delete()158    k.set_owner("open")159    k.save()160    write_response(w, '{"result": "ok"}')161def copy_kozi(r, w, params):162    cookies = parse_cookies(r)163    if "session" not in cookies:164        write_response(w, '{"error": "auth is required"}')165        return166    mng = get_session_manager()167    username = mng.get_session(cookies["session"])168    if username is None:169        write_response(w, '{"error": "auth is required"}')170        return171    u = find_user(username)172    if u is None:173        write_response(w, '{"error": "auth is required"}')174        return175    d = get_json_or_error(r, w)176    if d is None:177        return178    if "url" not in d or "name" not in d:179        write_response(w, '{"error": "no name or url in body"}')180        return181    url = d["url"]182    name = d["name"]183    if not validate(name):184        write_response(w, '{"error": "invalid name"}')185        return186    if url.lower().startswith("gopher") or url.lower().startswith("file"):187        write_response(w, '{"error": "invalid url"}')188        return189    req = urlopen(url)190    data = req.read()191    try:192        j = json.loads(data)193    except:194        write_response(w, '{"error": "incorrect json %s"}' % data)195        return196    if "result" not in j:197        write_response(w, '{"error": "incorrect json %s"}' % data)198        return199    j = j["result"]200    if type(j) != type({}):201        write_response(w, '{"error": "incorrect json %s"}' % data)202        return203    if "name" not in j or "fortune" not in j or "owner" not in j or "pipe" not in j:204        write_response(w, '{"error": "no name or fortune or owner or pipe in body"}')205        return206    fortune = j["fortune"]207    pipe = j["pipe"]208    k = find_kozinak("%s_%s" % (username, name))209    if k is not None:210        write_response(w, '{"error": "kozinak already exists"}')211        return212    create_kozinak(name, fortune, username, pipe)213    write_response(w, '{"result": "ok"}')214def exec_kozi(r, w, params):215    if len(params) != 1:216        write_response(w, '{"error": "no kozi in params"}')217        return218    kozi_name = params[0]219    k = find_kozinak(kozi_name)220    if k is None:221        write_response(w, '{"error": "no such kozinak"}')222        return223    if k.owner != "open":224        cookies = parse_cookies(r)225        if "session" not in cookies:226            write_response(w, '{"error": "auth is required"}')227            return228        mng = get_session_manager()229        username = mng.get_session(cookies["session"])230        if username is None:231            write_response(w, '{"error": "auth is required"}')232            return233        u = find_user(username)234        if u is None:235            write_response(w, '{"error": "auth is required"}')236            return237        if not chk_owner(username, k.owner):238            write_response(w, '{"error": "no access"}')239            return240    result = ""241    while True:242        result += k.fortune243        if k.pipe is not None:244            newk = find_kozinak(k.pipe)245            if newk is not None:246                k = newk247            else:248                break249        else:250            break251    write_response(w, json.dumps({252        'result': result...create_snapshots.py
Source:create_snapshots.py  
1import os2import json3from usgs import api4testdir = os.path.dirname(os.path.abspath(__file__))5def write_response(response, filename):6    fpath = os.path.join(testdir, 'data', filename)7    with open(fpath, 'w') as f:8        json.dump(response, f)9def create_snapshots():10    """11    Run requests against USGS API for use in tests.12    """13    api_key = api.login(os.environ['USGS_USERNAME'], os.environ['USGS_PASSWORD'])14    # Dataset Fields15    response = api.dataset_fields("LANDSAT_8_C1", "EE", api_key=api_key)16    write_response(response, 'dataset-fields.json')17    # Datasets18    response = api.datasets(None, "EE")19    write_response(response, 'datasets.json')20    # Download21    response = api.download("LANDSAT_8_C1", "EE", ["LC80810712017104LGN00"], product='STANDARD')22    write_response(response, 'download.json')23    # Download Options24    response = api.download_options("LANDSAT_8_C1", "EE", ["LC80810712017104LGN00"])25    write_response(response, 'download-options.json')26    # Metadata27    response = api.metadata("LANDSAT_8_C1", "EE", ["LC80810712017104LGN00"])28    write_response(response, 'metadata.json')29    # Search30    response = api.search("LANDSAT_8_C1", "EE", start_date='20170401', end_date='20170402', max_results=10)31    write_response(response, 'search.json')32    api.logout(api_key)33if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
