Best Python code snippet using yandex-tank
database.py
Source:database.py  
...110        db = self.db111        cursor = db.cursor()112        cursor.execute(sql, args)113        return cursor114    def _select_one(self, sql, *args):115        cursor = self._execute(sql, *args)116        result = cursor.fetchone()117        return result118    def _select(self, sql, *args):119        cursor = self._execute(sql, *args)120        result = cursor.fetchall()121        return result122    def get_cards(self):123        sql = """select cs_card.card_id, card_key124                        from cs_card 125                 inner join main.cs_user on cs_card.card_id = cs_user.card_id"""126        result = self._select(sql)127        return result128    def find_card(self, card_key):129        # -1 is not found130        sql = """select card_id from cs_card where card_key=?"""131        result = self._select_one(sql, card_key)132        if not result:133            return -1134        return result[0]135    def find_user_can_vote_by_card(self, card_key):136        card_id = self.find_card(card_key)137        if card_id <= 0:138            return 0139        sql = """select can_vote from main.cs_user where card_id = ?"""140        result = self._select_one(sql, card_id)141        if result:142            if not result[0]:143                return 0144            return 1145        return 0146    def find_user_by_card(self, card_key):147        card_id = self.find_card(card_key)148        if card_id <= 0:149            return 0150        sql = """select user_id from cs_user where card_id = ?"""151        result = self._select_one(sql, card_id)152        if result:153            if not result[0]:154                return 0155            return result[0]156        return 0157    def couple_card(self, person_id, card_key):158        card_id = self.find_card(card_key)159        if card_id < 0:160            _ = self.add_card(card_key)161            card_id = self.find_card(card_key)162            if card_id < 0:163                raise BaseException('')164        user_id = self.find_card_user(card_id)165        if user_id == person_id:166            # already ok167            return person_id, 0168        # remove card_id from old user169        print('ok')170        if user_id > 0:171            sql = """update cs_user set card_id=0 where user_id=?"""172            self._execute(sql, user_id)173        # set card_id for new user174        sql = """update cs_user set card_id=? where user_id=?"""175        self._execute(sql, card_id, person_id)176        self.commit()177        return person_id, user_id178    def add_card(self, card_key):179        sql = """select card_id from cs_card where card_key=?"""180        res = self._select_one(sql, card_key)181        if res:182            print('card_key exists')183            return res[0]184        sql = """insert into cs_card(card_key) values(?)"""185        self._execute(sql, card_key)186        self.commit()187        sql = """select card_id from cs_card where card_key=?"""188        res = self._select_one(sql, card_key)189        return res[0]190    def find_card_user(self, card_id):191        sql = """select user_id from cs_user where card_id = ?"""192        res = self._select_one(sql, card_id)193        if not res:194            return 0195        return res[0]196    def get_users(self):197        sql = """select user_id, user_key, user_name, party_id 198                from cs_user  """199        result = self._select(sql)200        return result201    def get_user(self, user_id=None, user_key=None  ):202        if not user_key and not user_id:203            return []204        sql = """select user_id, user_key, user_name, party_id from cs_user """205        sql_where = ''206        if user_id:207            sql_where = """where user_id = ?"""208            srch = user_id209        elif user_key:210            sql_where = """where user_key = ?"""211            srch = user_key212        sql = sql + sql_where213        result = self._select_one(sql, srch)214        return result215    def get_user_full(self, user_id):216        if not user_id:217            return []218        sql = """select user_id, user_key, user_name, party_id, card_id, can_vote from cs_user """219        sql_where = ''220        sql_where = """where user_id = ?"""221        srch = user_id222        sql = sql + sql_where223        result = self._select_one(sql, srch)224        return result225    def save_user(self, user_id=None, user_key='', user_name='', party_id=None, can_vote=0):226        # sql = """227        #     insert into cs_user(user_id, user_key, user_name, party_id)228        #     values(?, ?, ?, ?)229        #     on confict(user_id)230        #     do update set user_key = ?, user_name=?, party_id=?231        #     """232        if isinstance(can_vote, (str, bytes)) and can_vote.isdigit():233            can_vote = int(can_vote)234        if can_vote:235            can_vote = 1236        else:237            can_vote = 0238        sql_exists = """select user_id from cs_user where user_id=?"""239        sql_insert = """240                    insert into cs_user(user_id, user_key, user_name, party_id, can_vote)241                    values(?, ?, ?, ?, ?)242                    243                    """244        sql_update = """update cs_user set user_key = ?, user_name=?, party_id=?, can_vote=? where user_id=?"""245        cursor = self._execute(sql_exists, user_id)246        res = cursor.fetchone()247        if not res:248            #print('insert')249            cursor.execute(sql_insert, (user_id, user_key, user_name, party_id, can_vote))250        else:251            #print('update')252            cursor.execute(sql_update, (user_key, user_name, party_id, can_vote, user_id))253        cursor.close()254        self.db.commit()255    def get_positions(self):256        sql = """select pos_id, pos_user_id, pos_group_id from cs_group_position"""257        result = self._select(sql)258        return result259    def get_user_from_position(self, position_id, group_id=0):260        sql = """select pos_user_id from cs_group_position where pos_id=? and pos_group_id=?"""261        result = self._select_one(sql, position_id, group_id)262        print('db get_user_from_position', result)263        if result:264            return result[0]265        return 0266    def get_users_and_positions(self, group_id=0):267        # sql = """select user_id,268        #         user_name,269        #         pos_id,270        #         party_id,271        #         pos_group_id272        #         from cs_user273        #         left outer join cs_group_position274        #             on cs_user.user_id = cs_group_position.pos_user_id275        #         left out join cs_card on cs_card.card_id = cs_user.card_id276        #         where pos_group_id = ?277        #         order by user_name"""278        sql = """select user_id,279                        user_name,280                        party_id,281                        card_id,282                        can_vote283                        from cs_user 284                        order by user_name"""285        users = self._select(sql)286        sql = """select pos_user_id,287                        pos_id,288                        pos_group_id from289                        cs_group_position  290                        where pos_group_id = ?291                        """292        positions = self._select(sql, group_id)293        sql = """select card_id, card_key from cs_card order by card_id"""294        cards = self._select(sql)295        result = []296        for user in users:297            mic_id = 0 # mic_id ~ pos_id298            card_key = ''299            #found = False300            for position in positions:301                if user[0] == position[0]:302                    mic_id = position[1]303                    #found = True304                    break305            for card in cards:306                if user[3] == card[0]:307                    card_key = card[1]308                    break309            #if found:database.py310            result.append((user[0], user[1], mic_id, user[2], group_id, card_key, user[4]))311        #for r in result:312        #    print(r)313        return result314    def map_cards(self, cards, card_id):315        card_key = ''316        for card_id_val, card_key_val in cards:317            if card_id == card_id_val:318                return card_key_val319        return ''320    def get_user_and_position(self, user_id, group_id=0):321        cards = self.get_cards()322        sql = """select user_id,323                user_name,324                pos_id,325                party_id,326                pos_group_id, 327                card_id,328                can_vote329                from cs_user 330                left outer join cs_group_position on cs_user.user_id = cs_group_position.pos_user_id331                where user_id = ? and pos_group_id=?332                order by user_id"""333        result = self._select_one(sql, user_id, group_id)334        print(result, '----')335        if not result:336            sql = """select user_id,337                            user_name,338                            party_id,339                            card_id, 340                            can_vote341                            from cs_user 342                            where user_id = ?343                            order by user_id"""344            result = self._select_one(sql, user_id)345            if result:346                card_key = self.map_cards(cards, result[3])347                result = (result[0], result[1], 0, result[2], group_id, card_key, result[4])348        else:349            card_key = self.map_cards(cards, result[5])350            result = (result[0], result[1], result[2], result[3], result[4], card_key, result[6])351            print('get_user_and_position', result)352        return result353    def decouple_user(self, user_id, group_id=0):354        sql = """delete from  cs_group_position where pos_user_id=? and pos_group_id=?"""355        cursor = self._execute(sql, user_id, group_id)356    def save_position(self, position_id, user_id, group_id=0):357        # clear the position358        sql_exists = """select pos_id, pos_user_id from cs_group_position where pos_id=? and pos_group_id=?"""359        sql_insert = """insert into cs_group_position(pos_group_id, pos_id, pos_user_id) values(?, ?, ?)"""360        sql_update = """update cs_group_position set pos_user_id=? where pos_id=? and pos_group_id=?"""361        cursor = self._execute(sql_exists, position_id, group_id)362        res = cursor.fetchone()363        if not res:364            #print('insert')365            cursor.execute(sql_insert, (group_id, position_id, user_id))366        else:367            #print('update')368            cursor.execute(sql_update, (user_id, position_id, group_id))369        cursor.close()370        self.db.commit()371    def get_parties(self):372        sql = """select party_id, party_key, party_name, party_logo_name from  cs_party order by party_key"""373        result = self._select(sql)374        return result375    def get_party_logo_name(self, party_id=0):376        if not party_id:377            return ''378        sql = """select party_logo_name from  cs_party379                 where party_id = ?380        381        """382        result = self._select_one(sql, party_id)383        if result:384            party_logo_name = result[0]385            #party_logo_name = party_logo_name[0]386            return party_logo_name387        return ''388    def get_message(self, message_id=None, message_key=None):389        if not message_id and not message_key:390            return ''391        if message_id:392            srch_key = message_id393            sql_where = """where message_id=?"""394        elif message_key:395            srch_key = message_key396            sql_where = """where message_key=?"""397        sql = """select message_id, message_key, message_value 398                    from cs_message """399        sql = sql + sql_where400        result = self._select_one(sql, srch_key)401        if result:402            return result[0]403        return 0404    def set_message(self, message_id, message_key, message_value):405        sql_exists = """select message_id, message_key, message_value from cs_message where message_id=?"""406        sql_insert = """insert into cs_message(message_id, message_key, message_value) values(?, ?, ?)"""407        sql_update = """update cs_message set message_key=?, message_value=? where message_id=?"""408        cursor = self.db.cursor()409        cursor.execute(sql_exists, (message_id,))410        res = cursor.fetchone()411        if not res:412            cursor.execute(sql_insert, (message_id, message_key, message_value))413        else:414            cursor.execute(sql_update, (message_key, message_value, message_id, ))...wykop_parser.py
Source:wykop_parser.py  
1import json2import os3from bs4 import BeautifulSoup4from bs4 import NavigableString5def _select_one(tag, select_string, index):6    if tag is None:7        return None8    tags = tag.select(select_string)9    if len(tags) > index:10        return tags[index]11    else:12        return None13def _select_list(tag, select_string):14    if tag is None:15        return []16    return tag.select(select_string)17def _attr(tag, attr_name):18    if tag is None:19        return None20    if attr_name not in tag.attrs:21        return None22    return tag.attrs[attr_name]23def _select_attr_lambda(tag, select_string, index, attr_name, func):24    attr = _attr(_select_one(tag, select_string, index), attr_name)25    if attr is None:26        return None27    return func(attr)28def _select_attr(tag, select_string, index, attr_name):29    attr = _attr(_select_one(tag, select_string, index), attr_name)30    if attr is None:31        return None32    return attr33def _extract_text(tag):34    if tag is None:35        return None36    if isinstance(tag, NavigableString):37        return tag.string.strip()38    if tag.name == 'a':39        if 'href' in tag.attrs:40            return ' ' + tag.attrs['href'] + ' '41        else:42            return " "43    if tag.name == 'br':44        return "\n"45    text = ""46    for cn in tag.contents:47        text = text + _extract_text(cn)48    return text49def format_datetime(dt_str):50    if dt_str is None:51        return None52    # Remove last ':' from date to match UTC format53    # 2017-10-06T13:40:52+02:00 -> 2017-10-06T13:40:52+020054    da = dt_str.split(':')55    ds = ":".join(da[0:-1])56    ds = ds+da[-1]57    return ds58class WykopPage:59    """60    Class representing data on wykop page.61    """62    def __init__(self, page_text, up_votes_text=None, down_votes_text=None):63        """64        Construct page model from wykop data65        :param page_text: str individual wykop html page66        :param up_votes_text: str string representing response for up votes67        :param down_votes_text: str string representing response for up votes68        """69        page_dom = BeautifulSoup(page_text, "html.parser")70        if up_votes_text is not None:71            up_voters_json = up_votes_text.replace("for(;;);", "", 1)72            up_voters_html = json.loads(up_voters_json)['operations'][2]['html']73            self._up_voters_dom = BeautifulSoup(up_voters_html, "html.parser")74            self._up_voters = WykopVoters(self._up_voters_dom)75        else:76            self._up_voters = WykopVoters(None)77        if down_votes_text is not None:78            down_voters_json = down_votes_text.replace("for(;;);", "", 1)79            down_voters_html = json.loads(down_voters_json)['operations'][2]['html']80            self._down_voters_dom = BeautifulSoup(down_voters_html, "html.parser")81            self._down_voters = WykopVoters(self._down_voters_dom)82        else:83            self._down_voters = WykopVoters(None)84        self._page_ = page_dom85        self._mainPanelTag_ = _select_one(self._page_, "div#site div.grid div.grid-main", 0)86        self._rightPanelTag_ = _select_one(self._page_, "div#site div.grid-right", 0)87        self._contentPanelTag_ = _select_one(self._mainPanelTag_, "div.rbl-block", 0)88        self._votesPanelTag_ = _select_one(self._mainPanelTag_, "div.rbl-block", 1)89        self._discussions_ = WykopDiscussions(_select_one(self._mainPanelTag_, "ul.comments-stream", 0))90        self._infoPanel_ = _select_one(self._rightPanelTag_, "div.information", 0)91    def page_id(self):92        u = self.url()93        if u is None:94            return None95        url_prefix_len = len("http://www.wykop.pl/link/")96        if u.startswith("https"):97            url_prefix_len = len("https://www.wykop.pl/link/")98        return long(u[url_prefix_len:].split('/')[0])99    def title(self):100        t = _select_one(self._contentPanelTag_, "h2 > a", 0)101        if t is None:102            return None103        return t.string104    def url(self):105        return _select_attr(self._page_, 'meta[property="og:url"]', 0, 'content')106    def author(self):107        return _select_attr_lambda(self._page_, 'meta[property="article:author"]', 0, 'content',108                                   lambda s: s.split('/')[-2])109    def description(self):110        return _select_attr_lambda(self._page_, 'meta[property="og:description"]', 0, 'content', lambda s: s.strip())111    def keys(self):112        def split_keys(s):113            return s.replace(',', '').replace('#', '').split(' ')114        return _select_attr_lambda(self._page_, 'meta[name="keywords"]', 0, 'content', split_keys)115    def link(self):116        return _select_attr(self._page_, 'div.fix-tagline span a.affect', 0, 'href')117    def add_time(self):118        return format_datetime(_select_attr(self._page_, 'div.information b time', 0, 'datetime'))119    def display(self):120        ds = _select_one(self._infoPanel_, 'tr.infopanel td b', 2)121        if ds is None:122            return None123        return ds.string124    def up_votes(self):125        ds = _select_one(self._infoPanel_, 'tr.infopanel td b', 0)126        if ds is None:127            return 0128        return int(ds.string)129    def down_votes(self):130        ds = _select_one(self._infoPanel_, 'tr.infopanel td b', 1)131        if ds is None:132            return 0133        return int(ds.string)134    def votes(self):135        return self.up_votes() - self.down_votes()136    def up_voters(self):137        return self._up_voters138    def down_voters(self):139        return self._down_voters140    def annotation(self):141        an = _select_one(self._contentPanelTag_, 'div.annotation p', 0)142        if an is None:143            return None144        return an.string145    def discussions(self):146        return self._discussions_147    def to_dict(self):148        return {149            'page_id': self.page_id(),150            'title': self.title(),151            'url': self.url(),152            'author': self.author(),153            'annotation': self.annotation(),154            'description': self.description(),155            'keys': self.keys(),156            'link': self.link(),157            'add_time': self.add_time(),158            'display': self.display(),159            'votes': self.votes(),160            'up_votes': self.up_votes(),161            'down_votes': self.down_votes(),162            'discussions': self.discussions().to_dict(),163            'up_voters': self.up_voters().to_dict(),164            'down_voters': self.down_voters().to_dict(),165        }166class WykopVoters:167    def __init__(self, voters_tag):168        self._voters_tag = voters_tag169        self._votes = []170        if self._voters_tag is not None:171            for v in _select_list(self._voters_tag, 'div.usercard'):172                self._votes.append(WykopVote(v))173    def votes(self):174        return self._votes175    def to_dict(self):176        v = []177        for vt in self.votes():178            v.append(vt.to_dict())179        return v180class WykopVote:181    def __init__(self, vote_div):182        self._vote_div = vote_div183    def user(self):184        u = _select_attr(self._vote_div, 'a["title"]', 0, 'title')185        # When user removed account there is no link. Try to extract from b tag186        if u is None:187            u = _extract_text(_select_one(self._vote_div, 'b', 0))188        return u189    def vote_time(self):190        return format_datetime(_select_attr(self._vote_div, 'span.info time', 0, 'datetime'))191    def to_dict(self):192        return {193            'user': self.user(),194            'vote_time': self.vote_time()195        }196class WykopDiscussions:197    def __init__(self, comments_panel_tag):198        self._comments_panel_tag_ = comments_panel_tag199        self._discusions_ = []200        select_list = _select_list(self._comments_panel_tag_, "li.iC")201        for dis in select_list:202            self._discusions_.append(WykopDiscussion(dis))203        self._index = len(self._discusions_)204    def len(self):205        return len(self._discusions_)206    def all(self):207        return self._discusions_208    def to_dict(self):209        ds = []210        for d in self.all():211            ds.append(d.to_dict())212        return ds213class WykopDiscussion:214    def __init__(self, discussion_li_tag):215        self.discussion_li_tag = discussion_li_tag216        self._main_comment_ = WykopComment(_select_one(self.discussion_li_tag, "div", 0))217        self._responses_ = []218        for divs in _select_list(self.discussion_li_tag, "ul li > div.wblock.lcontrast.dC"):219            self._responses_.append(WykopComment(divs))220    def main_comment(self):221        return self._main_comment_222    def responses(self):223        return self._responses_224    def to_dict(self):225        rs = []226        for r in self.responses():227            rs.append(r.to_dict())228        return {229            'main': self.main_comment().to_dict(),230            'responses': rs231        }232class WykopComment:233    def __init__(self, comment_div_tag):234        self._comment_div_tag_ = comment_div_tag235    def comment_id(self):236        did = _attr(self._comment_div_tag_, 'data-id')237        return did238    def author(self):239        return _select_attr_lambda(self._comment_div_tag_, 'a.profile', 0, 'href', lambda s: s.split('/')[-2])240    def add_time(self):241        return format_datetime(_select_attr(self._comment_div_tag_, 'div.author time', 0, 'datetime'))242    def up_votes(self):243        up = _select_attr(self._comment_div_tag_, 'div.author p.vC', 0, 'data-vcp')244        if up is None:245            return 0246        return abs(int(up))247    def down_votes(self):248        dw = _select_attr(self._comment_div_tag_, 'div.author p.vC', 0, 'data-vcm')249        if dw is None:250            return 0251        return abs(int(dw))252    def votes(self):253        return self.up_votes() - self.down_votes()254    def content(self):255        text = _extract_text(_select_one(self._comment_div_tag_, 'div.text', 0))256        if text is None:257            return None258        return text.strip()259    def to_dict(self):260        return {261            'comment_id': self.comment_id(),262            'author': self.author(),263            'add_time': self.add_time(),264            'votes': self.votes(),265            'up_votes': self.up_votes(),266            'down_votes': self.down_votes(),267            'content': self.content(),268        }269"""...oddsparser.py
Source:oddsparser.py  
...20        self.days = dict()21        self.current_date = None22        self.games = list()23        24    def _select_one(self, element, selector):25        elist = element.select(selector)26        if len(elist) > 1:27            raise RuntimeError, "Too many %s" % selector28        if elist:29            return elist.pop()30        31    def get_event_schedule(self):32        return self._select_one(self.soup, '#event-schedule')33    def handle_event(self, event):34        gdate = self.current_date35        head, body = event.children36        gtime, ampm = head.next.text.split()37        gtimestr = '%s %s' % (gtime, AMPM[ampm])38        gtime = datetime.strptime(gtimestr, game_time_format)39        d = gdate.date()40        t = gtime.time()41        gametime = datetime.combine(d, t) - one_hour42        away_tr = self._select_one(body, '.away')43        competitor = self._select_one(away_tr, '.competitor-name')44        away_name = competitor.next.text45        runline_td = self._select_one(away_tr, '.runline')46        away_runline = ''.join(runline_td.next.text.split())47        48        totalnumber_div = self._select_one(away_tr, '.total-number')49        totalnumber = totalnumber_div.next.text50        home_tr = self._select_one(body, '.home')51        competitor = self._select_one(home_tr, '.competitor-name')52        home_name = competitor.next.text53        runline_td = self._select_one(home_tr, '.runline')54        home_runline = ''.join(runline_td.next.text.split())55        data = dict(gametime=gametime, home=home_name, away=away_name,56                    home_line=home_runline, away_line=away_runline,57                    total=totalnumber)58        self.days[gdate].append(data)59        self.games.append(data)60    61    def handle_event_schedule_child(self, child):62        if 'class' in child.attrs and child['class'] == ['schedule-date']:63            date = self.get_date(child)64            self.days[date] = []65            self.current_date = date66        elif 'class' in child.attrs and child['class'] == ['event']:67            self.handle_event(child)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
