How to use test_fetch method in avocado

Best Python code snippet using avocado_python

test_fetch.py

Source:test_fetch.py Github

copy

Full Screen

1from asyncio import TimeoutError2import logging3from unittest.mock import call4import pytest5import asynctest6from aiohttp import client_exceptions7import adstxt.fetch as fetch8DUMMY_FETCH_DATA_CR = "foo\n\rbar\n\rbaz\n\r"9DUMMY_FETCH_DATA_NL = "foo\nbar\nbaz\n"10EXPECTED_RESULTS = ("foo", "bar", "baz")11USER_AGENT = 'testings'12class Headers():13 def __init__(self, headers):14 self._headers = headers15 def __call__(self):16 return self._headers17 def get(self, key, if_none):18 return self._headers.get(key, if_none)19class MockSession():20 """This clones the bits of an aiohttp.Session so we can mock less."""21 def __init__(self, text, response, redirect, headers):22 self._text = text23 self._response = response24 self._headers = headers25 self.headers = Headers(self._headers)26 if redirect:27 self._history = redirect28 else:29 self._history = []30 async def __aexit__(self, *args):31 pass32 async def __aenter__(self):33 return self34 @property35 def status(self):36 return self._response37 async def text(self):38 return self._text39 @property40 def history(self):41 return self._history42@pytest.mark.asyncio43async def test_fetch_cr(mocker):44 mock_get = mocker.patch.object(fetch.ClientSession, 'get')45 mock_get.return_value = MockSession(46 DUMMY_FETCH_DATA_CR, 200, False, {'Content-Type': 'text/plain'})47 test_fetch = await fetch.fetch('localhost', USER_AGENT)48 assert test_fetch.response == EXPECTED_RESULTS49 assert test_fetch.domain == 'localhost'50@pytest.mark.asyncio51async def test_fetch_nl(mocker):52 mock_get = mocker.patch.object(fetch.ClientSession, 'get')53 mock_get.return_value = MockSession(54 DUMMY_FETCH_DATA_NL, 200, False, {'Content-Type': 'text/plain'})55 test_fetch = await fetch.fetch('localhost', USER_AGENT)56 assert test_fetch.response == EXPECTED_RESULTS57 assert test_fetch.domain == 'localhost'58@pytest.mark.asyncio59async def test_fetch_exceptions(mocker):60 mock_get = mocker.patch.object(fetch.ClientSession, 'get')61 # Mock out sleep so tests don't take ages.62 mock_sleep = mocker.patch.object(fetch, 'sleep')63 mock_sleep_coroutine_mock = asynctest.CoroutineMock()64 mock_sleep.side_effect = mock_sleep_coroutine_mock65 mock_get.side_effect = [TimeoutError, MockSession(66 DUMMY_FETCH_DATA_NL, 200, False, {'Content-Type': 'text/plain'})]67 test_fetch = await fetch.fetch('localhost', USER_AGENT)68 assert test_fetch.response == EXPECTED_RESULTS69 assert test_fetch.domain == 'localhost'70 # Check we've backed off.71 assert mock_sleep_coroutine_mock.call_count == 172@pytest.mark.asyncio73async def test_fetch_client_cert_invalid_retry_www(mocker):74 mock_get = mocker.patch.object(fetch.ClientSession, 'get')75 mock_get.side_effect = [76 client_exceptions.ClientConnectorCertificateError('a', 'b'),77 MockSession(78 DUMMY_FETCH_DATA_NL, 200, False, {'Content-Type': 'text/plain'})]79 test_fetch = await fetch.fetch('localhost', USER_AGENT)80 assert test_fetch.response == EXPECTED_RESULTS81 assert test_fetch.domain == 'localhost'82 # Assert that we go away from localhost and check www.localhost83 expected_calls = [call('http://localhost/ads.txt',84 headers={'User-Agent': 'testings'}),85 call('http://www.localhost/ads.txt',86 headers={'User-Agent': 'testings'})]87 assert mock_get.mock_calls == expected_calls88@pytest.mark.asyncio89async def test_fetch_unicode_decode_error(mocker):90 mock_get = mocker.patch.object(fetch.ClientSession, 'get')91 mock_get.side_effect = UnicodeDecodeError(92 'blah', b'\x00\x00', 1, 2, 'unicode is hard mmmkay')93 test_fetch = await fetch.fetch('localhost', USER_AGENT)94 assert test_fetch.response == ()95 assert test_fetch.domain == 'localhost'96 assert test_fetch.adstxt_present is False97@pytest.mark.asyncio98async def test_fetch_bad_page(mocker):99 mock_get = mocker.patch.object(fetch.ClientSession, 'get')100 mock_get.return_value = MockSession(101 '<!doctype html></br>', 200, False, {'Content-Type': 'text/plain'})102 test_fetch = await fetch.fetch('localhost', USER_AGENT)103 assert test_fetch.response is ()104 assert test_fetch.domain == 'localhost'105 assert test_fetch.adstxt_present is False106@pytest.mark.asyncio107async def test_fetch_html_content(mocker):108 mock_get = mocker.patch.object(fetch.ClientSession, 'get')109 mock_get.return_value = MockSession(110 '<!doctype html></br>', 200, False, {'Content-Type': 'text/html'})111 test_fetch = await fetch.fetch('localhost', USER_AGENT)112 assert test_fetch.response is ()113 assert test_fetch.domain == 'localhost'114 assert test_fetch.adstxt_present is False115@pytest.mark.asyncio116async def test_fetch_404(mocker):117 mock_get = mocker.patch.object(fetch.ClientSession, 'get')118 mock_get.return_value = MockSession(119 '<!doctype html><blink>', 404, False, {'Content-Type': 'text/html'})120 test_fetch = await fetch.fetch('localhost', USER_AGENT)121 assert test_fetch.response is ()122 assert test_fetch.domain == 'localhost'123 assert test_fetch.adstxt_present is False124class History():125 def __init__(self, url):126 self._url = url127 @property128 def url(self):129 return self._url130class Urls():131 def __init__(self, host):132 self._host = host133 @property134 def host(self):135 return self._host136@pytest.mark.asyncio137async def test_fetch_redirects_www_redirect(mocker):138 mock_get = mocker.patch.object(fetch.ClientSession, 'get')139 redirect_list = [History(Urls('ebay.co.uk')),140 History(Urls('www.ebay.co.uk'))]141 mock_get.return_value = MockSession(142 DUMMY_FETCH_DATA_CR, 200, redirect_list,143 {'Content-Type': 'text/plain'})144 test_fetch = await fetch.fetch('ebay.co.uk', USER_AGENT)145 assert test_fetch.response == EXPECTED_RESULTS146 assert test_fetch.domain == 'ebay.co.uk'147 assert test_fetch.adstxt_present is True148@pytest.mark.asyncio149async def test_fetch_redirects_subdomain_redirect(mocker, caplog):150 caplog.set_level(logging.DEBUG)151 mock_get = mocker.patch.object(fetch.ClientSession, 'get')152 # A tripple subdomain redirect checks that we're always valid when staying153 # on the same root domain.154 redirect_list = [History(Urls('bar.foo.com')),155 History(Urls('baz.foo.com')),156 History(Urls('qux.foo.com'))]157 mock_get.return_value = MockSession(158 DUMMY_FETCH_DATA_CR, 200, redirect_list,159 {'Content-Type': 'text/plain'})160 test_fetch = await fetch.fetch('bar.foo.com', USER_AGENT)161 assert test_fetch.response == EXPECTED_RESULTS162 assert test_fetch.domain == 'bar.foo.com'163 assert test_fetch.adstxt_present is True164@pytest.mark.asyncio165async def test_fetch_redirects_offdomain_redirect(mocker, caplog):166 caplog.set_level(logging.DEBUG)167 mock_get = mocker.patch.object(fetch.ClientSession, 'get')168 # A single off domain redirect is valid.169 redirect_list = [History(Urls('foo.com')),170 History(Urls('foo.bar.com'))]171 mock_get.return_value = MockSession(172 DUMMY_FETCH_DATA_CR, 200, redirect_list,173 {'Content-Type': 'text/plain'})174 test_fetch = await fetch.fetch('foo.com', USER_AGENT)175 assert test_fetch.response == EXPECTED_RESULTS176 assert test_fetch.domain == 'foo.com'177 assert test_fetch.adstxt_present is True178@pytest.mark.asyncio179async def test_fetch_redirects_bad_redirection_two_hops_off_site(mocker):180 mock_get = mocker.patch.object(fetch.ClientSession, 'get')181 redirect_list = [History(Urls('bad-redirect-domain.co.uk')),182 History(Urls('some-other-domain.co.uk')),183 History(Urls('some-different-domain.co.uk'))]184 mock_get.return_value = MockSession(185 DUMMY_FETCH_DATA_CR, 200, redirect_list,186 {'Content-Type': 'text/plain'})187 test_fetch = await fetch.fetch('bad-redirect-domain.co.uk', USER_AGENT)188 assert test_fetch.response is ()189 assert test_fetch.domain == 'bad-redirect-domain.co.uk'...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1from fastapi import FastAPI2from deta import Deta3# from config import config4app = FastAPI()5deta = Deta('c0jrkegd_rQMSFSXNmgb14J4dBL7hGzbrZq2YV12f')6db_users = deta.Base("users")7db_coins = deta.Base("coins")8db_level = deta.Base("levels")9db_links = deta.Base("links")10db_users_stat = deta.Base("users_stat")11@app.post("/create_new_user")12async def create_new_user(username: str, token: str, segment: int):13 db_users.put({14 "Username": username,15 "Token": token,16 "Segment": segment17 })18 db_coins.put({19 "Coins": 0,20 "Token": token21 })22 db_level.put({23 "levels": 0,24 "Token": token25 })26 db_coins.put({27 "Token": token,28 "link": False29 })30 db_users_stat.put({31 "First_lvl": False,32 "First_par": "",33 "Second_lvl": False,34 "Second_par": "",35 "Third_lvl": False,36 "Third_par": "",37 "Token": token,38 "is_vtb": False,39 "is_invest": False,40 "is_social": False41 })42@app.post("/insert_coins")43async def insert_coins(n: int, token: str):44 test_fetch = db_coins.fetch({"Token": token})45 key = test_fetch.items[0]["key"]46 update_module = {47 "Coins": test_fetch.items[0]["Coins"] + n48 }49 db_coins.update(update_module, key)50@app.post("/count_levels")51async def count_levels(token: str):52 test_fetch = db_level.fetch({"Token": token})53 key = test_fetch.items[0]["key"]54 update_module = {55 "Level": test_fetch.items[0]["Level"] + 156 }57 db_level.update(update_module, key)58@app.post("/level_info")59async def level_info(lvl_id: int, par: str, token: str):60 test_fetch = db_users_stat.fetch({"Token": token})61 key = test_fetch.items[0]["key"]62 update_module = {}63 if lvl_id == 1 and not test_fetch.items[0]["First_lvl"]:64 update_module = {65 "First_lvl": True,66 "First_par": par67 }68 elif lvl_id == 2 and not test_fetch.items[0]["Second_lvl"]:69 update_module = {70 "Second_lvl": True,71 "Second_par": par72 }73 elif lvl_id == 3 and not test_fetch.items[0]["Third_lvl"]:74 update_module = {75 "Third_lvl": True,76 "Third_par": par77 }78 db_users_stat.update(update_module, key)79@app.post("/is_vtb")80async def post_vtb_info(token: str):81 test_fetch = db_users_stat.fetch({"Token": token})82 key = test_fetch.items[0]["key"]83 update_module = {}84 if not test_fetch.items[0]["is_vtb"]:85 update_module = {86 "is_vtb": True87 }88 db_users_stat.update(update_module, key)89@app.post("/is_invest")90async def post_invest_info(token: str):91 test_fetch = db_users_stat.fetch({"Token": token})92 key = test_fetch.items[0]["key"]93 update_module = {}94 if not test_fetch.items[0]["is_invest"]:95 update_module = {96 "is_invest": True97 }98 db_users_stat.update(update_module, key)99@app.post("/is_social")100async def post_social_info(token: str):101 test_fetch = db_users_stat.fetch({"Token": token})102 key = test_fetch.items[0]["key"]103 update_module = {}104 if not test_fetch.items[0]["is_social"]:105 update_module = {106 "is_social": True107 }108 db_users_stat.update(update_module, key)109def get_stat(token: str):110 score = 0111 test_fetch = db_users_stat.fetch({"Token": token})112 if test_fetch.items[0]["First_lvl"]:113 score += 1114 if test_fetch.items[0]["Second_lvl"]:115 score += 1116 if test_fetch.items[0]["Third_lvl"]:117 score += 1118 if test_fetch.items[0]["is_invest"]:119 score += 1120 if test_fetch.items[0]["is_social"]:121 score += 1122 if test_fetch.items[0]["is_vtb"]:123 score += 1124 score = score / 6125 return score126@app.get("/stat")127async def get_many_stat():128 score = 0129 res = db_users_stat.fetch()130 all_items = res.items131 # fetch until last is 'None'132 while res.last:133 res = db_users_stat.fetch(last=res.last)134 all_items += res.items135 for row in all_items:136 if row["First_lvl"]:137 score += 1138 if row["Second_lvl"]:139 score += 1140 if row["Third_lvl"]:141 score += 1142 if row["is_invest"]:143 score += 1144 if row["is_social"]:145 score += 1146 if row["is_vtb"]:147 score += 1148 score /= 6149 row["score"] = score150 score = 0...

Full Screen

Full Screen

fetch.py

Source:fetch.py Github

copy

Full Screen

1# fetch.py -- example about declaring cursors2#3# Copyright (C) 2001-2010 Federico Di Gregorio <fog@debian.org>4#5# psycopg2 is free software: you can redistribute it and/or modify it6# under the terms of the GNU Lesser General Public License as published7# by the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# psycopg2 is distributed in the hope that it will be useful, but WITHOUT11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public13# License for more details.14## put in DSN your DSN string15DSN = 'dbname=test'16## don't modify anything below this line (except for experimenting)17import sys18import psycopg219if len(sys.argv) > 1:20 DSN = sys.argv[1]21print "Opening connection using dns:", DSN22conn = psycopg2.connect(DSN)23print "Encoding for this connection is", conn.encoding24curs = conn.cursor()25try:26 curs.execute("CREATE TABLE test_fetch (val int4)")27except:28 conn.rollback()29 curs.execute("DROP TABLE test_fetch")30 curs.execute("CREATE TABLE test_fetch (val int4)")31conn.commit()32# we use this function to format the output33def flatten(l):34 """Flattens list of tuples l."""35 return map(lambda x: x[0], l)36# insert 20 rows in the table37for i in range(20):38 curs.execute("INSERT INTO test_fetch VALUES(%s)", (i,))39conn.commit()40# does some nice tricks with the transaction and postgres cursors41# (remember to always commit or rollback before a DECLARE)42#43# we don't need to DECLARE ourselves, psycopg now supports named44# cursors (but we leave the code here, comments, as an example of45# what psycopg is doing under the hood)46#47#curs.execute("DECLARE crs CURSOR FOR SELECT * FROM test_fetch")48#curs.execute("FETCH 10 FROM crs")49#print "First 10 rows:", flatten(curs.fetchall())50#curs.execute("MOVE -5 FROM crs")51#print "Moved back cursor by 5 rows (to row 5.)"52#curs.execute("FETCH 10 FROM crs")53#print "Another 10 rows:", flatten(curs.fetchall())54#curs.execute("FETCH 10 FROM crs")55#print "The remaining rows:", flatten(curs.fetchall())56ncurs = conn.cursor("crs")57ncurs.execute("SELECT * FROM test_fetch")58print "First 10 rows:", flatten(ncurs.fetchmany(10))59ncurs.scroll(-5)60print "Moved back cursor by 5 rows (to row 5.)"61print "Another 10 rows:", flatten(ncurs.fetchmany(10))62print "Another one:", list(ncurs.fetchone())63print "The remaining rows:", flatten(ncurs.fetchall())64conn.rollback()65curs.execute("DROP TABLE test_fetch")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful