How to use _select_one method in yandex-tank

Best Python code snippet using yandex-tank

database.py

Source:database.py Github

copy

Full Screen

...110 db = self.db111 cursor = db.cursor()112 cursor.execute(sql, args)113 return cursor114 def _select_one(self, sql, *args):115 cursor = self._execute(sql, *args)116 result = cursor.fetchone()117 return result118 def _select(self, sql, *args):119 cursor = self._execute(sql, *args)120 result = cursor.fetchall()121 return result122 def get_cards(self):123 sql = """select cs_card.card_id, card_key124 from cs_card 125 inner join main.cs_user on cs_card.card_id = cs_user.card_id"""126 result = self._select(sql)127 return result128 def find_card(self, card_key):129 # -1 is not found130 sql = """select card_id from cs_card where card_key=?"""131 result = self._select_one(sql, card_key)132 if not result:133 return -1134 return result[0]135 def find_user_can_vote_by_card(self, card_key):136 card_id = self.find_card(card_key)137 if card_id <= 0:138 return 0139 sql = """select can_vote from main.cs_user where card_id = ?"""140 result = self._select_one(sql, card_id)141 if result:142 if not result[0]:143 return 0144 return 1145 return 0146 def find_user_by_card(self, card_key):147 card_id = self.find_card(card_key)148 if card_id <= 0:149 return 0150 sql = """select user_id from cs_user where card_id = ?"""151 result = self._select_one(sql, card_id)152 if result:153 if not result[0]:154 return 0155 return result[0]156 return 0157 def couple_card(self, person_id, card_key):158 card_id = self.find_card(card_key)159 if card_id < 0:160 _ = self.add_card(card_key)161 card_id = self.find_card(card_key)162 if card_id < 0:163 raise BaseException('')164 user_id = self.find_card_user(card_id)165 if user_id == person_id:166 # already ok167 return person_id, 0168 # remove card_id from old user169 print('ok')170 if user_id > 0:171 sql = """update cs_user set card_id=0 where user_id=?"""172 self._execute(sql, user_id)173 # set card_id for new user174 sql = """update cs_user set card_id=? where user_id=?"""175 self._execute(sql, card_id, person_id)176 self.commit()177 return person_id, user_id178 def add_card(self, card_key):179 sql = """select card_id from cs_card where card_key=?"""180 res = self._select_one(sql, card_key)181 if res:182 print('card_key exists')183 return res[0]184 sql = """insert into cs_card(card_key) values(?)"""185 self._execute(sql, card_key)186 self.commit()187 sql = """select card_id from cs_card where card_key=?"""188 res = self._select_one(sql, card_key)189 return res[0]190 def find_card_user(self, card_id):191 sql = """select user_id from cs_user where card_id = ?"""192 res = self._select_one(sql, card_id)193 if not res:194 return 0195 return res[0]196 def get_users(self):197 sql = """select user_id, user_key, user_name, party_id 198 from cs_user """199 result = self._select(sql)200 return result201 def get_user(self, user_id=None, user_key=None ):202 if not user_key and not user_id:203 return []204 sql = """select user_id, user_key, user_name, party_id from cs_user """205 sql_where = ''206 if user_id:207 sql_where = """where user_id = ?"""208 srch = user_id209 elif user_key:210 sql_where = """where user_key = ?"""211 srch = user_key212 sql = sql + sql_where213 result = self._select_one(sql, srch)214 return result215 def get_user_full(self, user_id):216 if not user_id:217 return []218 sql = """select user_id, user_key, user_name, party_id, card_id, can_vote from cs_user """219 sql_where = ''220 sql_where = """where user_id = ?"""221 srch = user_id222 sql = sql + sql_where223 result = self._select_one(sql, srch)224 return result225 def save_user(self, user_id=None, user_key='', user_name='', party_id=None, can_vote=0):226 # sql = """227 # insert into cs_user(user_id, user_key, user_name, party_id)228 # values(?, ?, ?, ?)229 # on confict(user_id)230 # do update set user_key = ?, user_name=?, party_id=?231 # """232 if isinstance(can_vote, (str, bytes)) and can_vote.isdigit():233 can_vote = int(can_vote)234 if can_vote:235 can_vote = 1236 else:237 can_vote = 0238 sql_exists = """select user_id from cs_user where user_id=?"""239 sql_insert = """240 insert into cs_user(user_id, user_key, user_name, party_id, can_vote)241 values(?, ?, ?, ?, ?)242 243 """244 sql_update = """update cs_user set user_key = ?, user_name=?, party_id=?, can_vote=? where user_id=?"""245 cursor = self._execute(sql_exists, user_id)246 res = cursor.fetchone()247 if not res:248 #print('insert')249 cursor.execute(sql_insert, (user_id, user_key, user_name, party_id, can_vote))250 else:251 #print('update')252 cursor.execute(sql_update, (user_key, user_name, party_id, can_vote, user_id))253 cursor.close()254 self.db.commit()255 def get_positions(self):256 sql = """select pos_id, pos_user_id, pos_group_id from cs_group_position"""257 result = self._select(sql)258 return result259 def get_user_from_position(self, position_id, group_id=0):260 sql = """select pos_user_id from cs_group_position where pos_id=? and pos_group_id=?"""261 result = self._select_one(sql, position_id, group_id)262 print('db get_user_from_position', result)263 if result:264 return result[0]265 return 0266 def get_users_and_positions(self, group_id=0):267 # sql = """select user_id,268 # user_name,269 # pos_id,270 # party_id,271 # pos_group_id272 # from cs_user273 # left outer join cs_group_position274 # on cs_user.user_id = cs_group_position.pos_user_id275 # left out join cs_card on cs_card.card_id = cs_user.card_id276 # where pos_group_id = ?277 # order by user_name"""278 sql = """select user_id,279 user_name,280 party_id,281 card_id,282 can_vote283 from cs_user 284 order by user_name"""285 users = self._select(sql)286 sql = """select pos_user_id,287 pos_id,288 pos_group_id from289 cs_group_position 290 where pos_group_id = ?291 """292 positions = self._select(sql, group_id)293 sql = """select card_id, card_key from cs_card order by card_id"""294 cards = self._select(sql)295 result = []296 for user in users:297 mic_id = 0 # mic_id ~ pos_id298 card_key = ''299 #found = False300 for position in positions:301 if user[0] == position[0]:302 mic_id = position[1]303 #found = True304 break305 for card in cards:306 if user[3] == card[0]:307 card_key = card[1]308 break309 #if found:database.py310 result.append((user[0], user[1], mic_id, user[2], group_id, card_key, user[4]))311 #for r in result:312 # print(r)313 return result314 def map_cards(self, cards, card_id):315 card_key = ''316 for card_id_val, card_key_val in cards:317 if card_id == card_id_val:318 return card_key_val319 return ''320 def get_user_and_position(self, user_id, group_id=0):321 cards = self.get_cards()322 sql = """select user_id,323 user_name,324 pos_id,325 party_id,326 pos_group_id, 327 card_id,328 can_vote329 from cs_user 330 left outer join cs_group_position on cs_user.user_id = cs_group_position.pos_user_id331 where user_id = ? and pos_group_id=?332 order by user_id"""333 result = self._select_one(sql, user_id, group_id)334 print(result, '----')335 if not result:336 sql = """select user_id,337 user_name,338 party_id,339 card_id, 340 can_vote341 from cs_user 342 where user_id = ?343 order by user_id"""344 result = self._select_one(sql, user_id)345 if result:346 card_key = self.map_cards(cards, result[3])347 result = (result[0], result[1], 0, result[2], group_id, card_key, result[4])348 else:349 card_key = self.map_cards(cards, result[5])350 result = (result[0], result[1], result[2], result[3], result[4], card_key, result[6])351 print('get_user_and_position', result)352 return result353 def decouple_user(self, user_id, group_id=0):354 sql = """delete from cs_group_position where pos_user_id=? and pos_group_id=?"""355 cursor = self._execute(sql, user_id, group_id)356 def save_position(self, position_id, user_id, group_id=0):357 # clear the position358 sql_exists = """select pos_id, pos_user_id from cs_group_position where pos_id=? and pos_group_id=?"""359 sql_insert = """insert into cs_group_position(pos_group_id, pos_id, pos_user_id) values(?, ?, ?)"""360 sql_update = """update cs_group_position set pos_user_id=? where pos_id=? and pos_group_id=?"""361 cursor = self._execute(sql_exists, position_id, group_id)362 res = cursor.fetchone()363 if not res:364 #print('insert')365 cursor.execute(sql_insert, (group_id, position_id, user_id))366 else:367 #print('update')368 cursor.execute(sql_update, (user_id, position_id, group_id))369 cursor.close()370 self.db.commit()371 def get_parties(self):372 sql = """select party_id, party_key, party_name, party_logo_name from cs_party order by party_key"""373 result = self._select(sql)374 return result375 def get_party_logo_name(self, party_id=0):376 if not party_id:377 return ''378 sql = """select party_logo_name from cs_party379 where party_id = ?380 381 """382 result = self._select_one(sql, party_id)383 if result:384 party_logo_name = result[0]385 #party_logo_name = party_logo_name[0]386 return party_logo_name387 return ''388 def get_message(self, message_id=None, message_key=None):389 if not message_id and not message_key:390 return ''391 if message_id:392 srch_key = message_id393 sql_where = """where message_id=?"""394 elif message_key:395 srch_key = message_key396 sql_where = """where message_key=?"""397 sql = """select message_id, message_key, message_value 398 from cs_message """399 sql = sql + sql_where400 result = self._select_one(sql, srch_key)401 if result:402 return result[0]403 return 0404 def set_message(self, message_id, message_key, message_value):405 sql_exists = """select message_id, message_key, message_value from cs_message where message_id=?"""406 sql_insert = """insert into cs_message(message_id, message_key, message_value) values(?, ?, ?)"""407 sql_update = """update cs_message set message_key=?, message_value=? where message_id=?"""408 cursor = self.db.cursor()409 cursor.execute(sql_exists, (message_id,))410 res = cursor.fetchone()411 if not res:412 cursor.execute(sql_insert, (message_id, message_key, message_value))413 else:414 cursor.execute(sql_update, (message_key, message_value, message_id, ))...

Full Screen

Full Screen

wykop_parser.py

Source:wykop_parser.py Github

copy

Full Screen

1import json2import os3from bs4 import BeautifulSoup4from bs4 import NavigableString5def _select_one(tag, select_string, index):6 if tag is None:7 return None8 tags = tag.select(select_string)9 if len(tags) > index:10 return tags[index]11 else:12 return None13def _select_list(tag, select_string):14 if tag is None:15 return []16 return tag.select(select_string)17def _attr(tag, attr_name):18 if tag is None:19 return None20 if attr_name not in tag.attrs:21 return None22 return tag.attrs[attr_name]23def _select_attr_lambda(tag, select_string, index, attr_name, func):24 attr = _attr(_select_one(tag, select_string, index), attr_name)25 if attr is None:26 return None27 return func(attr)28def _select_attr(tag, select_string, index, attr_name):29 attr = _attr(_select_one(tag, select_string, index), attr_name)30 if attr is None:31 return None32 return attr33def _extract_text(tag):34 if tag is None:35 return None36 if isinstance(tag, NavigableString):37 return tag.string.strip()38 if tag.name == 'a':39 if 'href' in tag.attrs:40 return ' ' + tag.attrs['href'] + ' '41 else:42 return " "43 if tag.name == 'br':44 return "\n"45 text = ""46 for cn in tag.contents:47 text = text + _extract_text(cn)48 return text49def format_datetime(dt_str):50 if dt_str is None:51 return None52 # Remove last ':' from date to match UTC format53 # 2017-10-06T13:40:52+02:00 -> 2017-10-06T13:40:52+020054 da = dt_str.split(':')55 ds = ":".join(da[0:-1])56 ds = ds+da[-1]57 return ds58class WykopPage:59 """60 Class representing data on wykop page.61 """62 def __init__(self, page_text, up_votes_text=None, down_votes_text=None):63 """64 Construct page model from wykop data65 :param page_text: str individual wykop html page66 :param up_votes_text: str string representing response for up votes67 :param down_votes_text: str string representing response for up votes68 """69 page_dom = BeautifulSoup(page_text, "html.parser")70 if up_votes_text is not None:71 up_voters_json = up_votes_text.replace("for(;;);", "", 1)72 up_voters_html = json.loads(up_voters_json)['operations'][2]['html']73 self._up_voters_dom = BeautifulSoup(up_voters_html, "html.parser")74 self._up_voters = WykopVoters(self._up_voters_dom)75 else:76 self._up_voters = WykopVoters(None)77 if down_votes_text is not None:78 down_voters_json = down_votes_text.replace("for(;;);", "", 1)79 down_voters_html = json.loads(down_voters_json)['operations'][2]['html']80 self._down_voters_dom = BeautifulSoup(down_voters_html, "html.parser")81 self._down_voters = WykopVoters(self._down_voters_dom)82 else:83 self._down_voters = WykopVoters(None)84 self._page_ = page_dom85 self._mainPanelTag_ = _select_one(self._page_, "div#site div.grid div.grid-main", 0)86 self._rightPanelTag_ = _select_one(self._page_, "div#site div.grid-right", 0)87 self._contentPanelTag_ = _select_one(self._mainPanelTag_, "div.rbl-block", 0)88 self._votesPanelTag_ = _select_one(self._mainPanelTag_, "div.rbl-block", 1)89 self._discussions_ = WykopDiscussions(_select_one(self._mainPanelTag_, "ul.comments-stream", 0))90 self._infoPanel_ = _select_one(self._rightPanelTag_, "div.information", 0)91 def page_id(self):92 u = self.url()93 if u is None:94 return None95 url_prefix_len = len("http://www.wykop.pl/link/")96 if u.startswith("https"):97 url_prefix_len = len("https://www.wykop.pl/link/")98 return long(u[url_prefix_len:].split('/')[0])99 def title(self):100 t = _select_one(self._contentPanelTag_, "h2 > a", 0)101 if t is None:102 return None103 return t.string104 def url(self):105 return _select_attr(self._page_, 'meta[property="og:url"]', 0, 'content')106 def author(self):107 return _select_attr_lambda(self._page_, 'meta[property="article:author"]', 0, 'content',108 lambda s: s.split('/')[-2])109 def description(self):110 return _select_attr_lambda(self._page_, 'meta[property="og:description"]', 0, 'content', lambda s: s.strip())111 def keys(self):112 def split_keys(s):113 return s.replace(',', '').replace('#', '').split(' ')114 return _select_attr_lambda(self._page_, 'meta[name="keywords"]', 0, 'content', split_keys)115 def link(self):116 return _select_attr(self._page_, 'div.fix-tagline span a.affect', 0, 'href')117 def add_time(self):118 return format_datetime(_select_attr(self._page_, 'div.information b time', 0, 'datetime'))119 def display(self):120 ds = _select_one(self._infoPanel_, 'tr.infopanel td b', 2)121 if ds is None:122 return None123 return ds.string124 def up_votes(self):125 ds = _select_one(self._infoPanel_, 'tr.infopanel td b', 0)126 if ds is None:127 return 0128 return int(ds.string)129 def down_votes(self):130 ds = _select_one(self._infoPanel_, 'tr.infopanel td b', 1)131 if ds is None:132 return 0133 return int(ds.string)134 def votes(self):135 return self.up_votes() - self.down_votes()136 def up_voters(self):137 return self._up_voters138 def down_voters(self):139 return self._down_voters140 def annotation(self):141 an = _select_one(self._contentPanelTag_, 'div.annotation p', 0)142 if an is None:143 return None144 return an.string145 def discussions(self):146 return self._discussions_147 def to_dict(self):148 return {149 'page_id': self.page_id(),150 'title': self.title(),151 'url': self.url(),152 'author': self.author(),153 'annotation': self.annotation(),154 'description': self.description(),155 'keys': self.keys(),156 'link': self.link(),157 'add_time': self.add_time(),158 'display': self.display(),159 'votes': self.votes(),160 'up_votes': self.up_votes(),161 'down_votes': self.down_votes(),162 'discussions': self.discussions().to_dict(),163 'up_voters': self.up_voters().to_dict(),164 'down_voters': self.down_voters().to_dict(),165 }166class WykopVoters:167 def __init__(self, voters_tag):168 self._voters_tag = voters_tag169 self._votes = []170 if self._voters_tag is not None:171 for v in _select_list(self._voters_tag, 'div.usercard'):172 self._votes.append(WykopVote(v))173 def votes(self):174 return self._votes175 def to_dict(self):176 v = []177 for vt in self.votes():178 v.append(vt.to_dict())179 return v180class WykopVote:181 def __init__(self, vote_div):182 self._vote_div = vote_div183 def user(self):184 u = _select_attr(self._vote_div, 'a["title"]', 0, 'title')185 # When user removed account there is no link. Try to extract from b tag186 if u is None:187 u = _extract_text(_select_one(self._vote_div, 'b', 0))188 return u189 def vote_time(self):190 return format_datetime(_select_attr(self._vote_div, 'span.info time', 0, 'datetime'))191 def to_dict(self):192 return {193 'user': self.user(),194 'vote_time': self.vote_time()195 }196class WykopDiscussions:197 def __init__(self, comments_panel_tag):198 self._comments_panel_tag_ = comments_panel_tag199 self._discusions_ = []200 select_list = _select_list(self._comments_panel_tag_, "li.iC")201 for dis in select_list:202 self._discusions_.append(WykopDiscussion(dis))203 self._index = len(self._discusions_)204 def len(self):205 return len(self._discusions_)206 def all(self):207 return self._discusions_208 def to_dict(self):209 ds = []210 for d in self.all():211 ds.append(d.to_dict())212 return ds213class WykopDiscussion:214 def __init__(self, discussion_li_tag):215 self.discussion_li_tag = discussion_li_tag216 self._main_comment_ = WykopComment(_select_one(self.discussion_li_tag, "div", 0))217 self._responses_ = []218 for divs in _select_list(self.discussion_li_tag, "ul li > div.wblock.lcontrast.dC"):219 self._responses_.append(WykopComment(divs))220 def main_comment(self):221 return self._main_comment_222 def responses(self):223 return self._responses_224 def to_dict(self):225 rs = []226 for r in self.responses():227 rs.append(r.to_dict())228 return {229 'main': self.main_comment().to_dict(),230 'responses': rs231 }232class WykopComment:233 def __init__(self, comment_div_tag):234 self._comment_div_tag_ = comment_div_tag235 def comment_id(self):236 did = _attr(self._comment_div_tag_, 'data-id')237 return did238 def author(self):239 return _select_attr_lambda(self._comment_div_tag_, 'a.profile', 0, 'href', lambda s: s.split('/')[-2])240 def add_time(self):241 return format_datetime(_select_attr(self._comment_div_tag_, 'div.author time', 0, 'datetime'))242 def up_votes(self):243 up = _select_attr(self._comment_div_tag_, 'div.author p.vC', 0, 'data-vcp')244 if up is None:245 return 0246 return abs(int(up))247 def down_votes(self):248 dw = _select_attr(self._comment_div_tag_, 'div.author p.vC', 0, 'data-vcm')249 if dw is None:250 return 0251 return abs(int(dw))252 def votes(self):253 return self.up_votes() - self.down_votes()254 def content(self):255 text = _extract_text(_select_one(self._comment_div_tag_, 'div.text', 0))256 if text is None:257 return None258 return text.strip()259 def to_dict(self):260 return {261 'comment_id': self.comment_id(),262 'author': self.author(),263 'add_time': self.add_time(),264 'votes': self.votes(),265 'up_votes': self.up_votes(),266 'down_votes': self.down_votes(),267 'content': self.content(),268 }269"""...

Full Screen

Full Screen

oddsparser.py

Source:oddsparser.py Github

copy

Full Screen

...20 self.days = dict()21 self.current_date = None22 self.games = list()23 24 def _select_one(self, element, selector):25 elist = element.select(selector)26 if len(elist) > 1:27 raise RuntimeError, "Too many %s" % selector28 if elist:29 return elist.pop()30 31 def get_event_schedule(self):32 return self._select_one(self.soup, '#event-schedule')33 def handle_event(self, event):34 gdate = self.current_date35 head, body = event.children36 gtime, ampm = head.next.text.split()37 gtimestr = '%s %s' % (gtime, AMPM[ampm])38 gtime = datetime.strptime(gtimestr, game_time_format)39 d = gdate.date()40 t = gtime.time()41 gametime = datetime.combine(d, t) - one_hour42 away_tr = self._select_one(body, '.away')43 competitor = self._select_one(away_tr, '.competitor-name')44 away_name = competitor.next.text45 runline_td = self._select_one(away_tr, '.runline')46 away_runline = ''.join(runline_td.next.text.split())47 48 totalnumber_div = self._select_one(away_tr, '.total-number')49 totalnumber = totalnumber_div.next.text50 home_tr = self._select_one(body, '.home')51 competitor = self._select_one(home_tr, '.competitor-name')52 home_name = competitor.next.text53 runline_td = self._select_one(home_tr, '.runline')54 home_runline = ''.join(runline_td.next.text.split())55 data = dict(gametime=gametime, home=home_name, away=away_name,56 home_line=home_runline, away_line=away_runline,57 total=totalnumber)58 self.days[gdate].append(data)59 self.games.append(data)60 61 def handle_event_schedule_child(self, child):62 if 'class' in child.attrs and child['class'] == ['schedule-date']:63 date = self.get_date(child)64 self.days[date] = []65 self.current_date = date66 elif 'class' in child.attrs and child['class'] == ['event']:67 self.handle_event(child)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful