Best Python code snippet using lemoncheesecake
irc.py
Source:irc.py  
1import asyncio2import time3import re4import base645import random6import traceback7class ParseError(BaseException):8    def __init__(self, desc, line, cursor):9        self.desc = desc10        self.line = line11        self.cursor = cursor12def parse_nuh(s):13    if not s: return None14    nuh = {}15    if '!' in s:16        nuh['nick'] = s[:s.find('!')]17        nuh['ident'] = s[s.find('!')+1:s.find('@')]18        nuh['host'] = s[s.find('@')+1:]19    else:20        nuh['nick'] = s21        nuh['ident'] = ''22        nuh['host'] = ''23    return nuh24def create_nuh(nuh):25    if nuh['ident'] != '':26        return f"{nuh['nick']}!{nuh['ident']}@{nuh['host']}"27    return nuh['nick']28class IRCLine:29    WHITESPACE = ' '30    TAGS_INDICATOR = '@'31    SOURCE_INDICATOR = ':'32    TAG_CLIENT_PREFIX = '+'33    TAG_SEPARATOR = ';'34    TAG_VALID_KEY_PATTERN = re.compile('[a-zA-Z\\d\\-]+')35    TAG_ESCAPE_CHARS = {':': ';', 's': ' ', '\\': '\\', 'r': '\r', 'n': '\n'}36    TAG_ESCAPE_CHARS_REV = {';': ':', ' ': 's', '\\': '\\', '\r': 'r', '\n': 'n'}37    VALID_VERB_PATTERN = re.compile('\\d{3}|[A-Za-z]+')  # lowercase letters aren't technically allowed38    def __init__(self, line=None, do_tags=True, tags=None, source=None, verb=None, params=None):39        if tags is None:40            tags = {}41        if params is None:42            params = []43        self._cursor = 044        self.line = line45        self._do_tags = do_tags46        self.tags = tags47        self.source = parse_nuh(source)48        self.sourceraw = source49        self.verb = verb50        self.params = params51    def _parse_error(self, desc):52        return ParseError(desc, self.line, self._cursor)53    def _parse_complete(self):54        return self._cursor >= len(self.line)55    # technically there should not be a bunch of whitespace in a row, but IRC servers are weird and not necessarily56    # standards-compliant57    def _skip_whitespace(self):58        while not self._parse_complete():59            if self.line[self._cursor] != IRCLine.WHITESPACE:60                return61            self._cursor += 162    def _parse_tags(self):63        self._skip_whitespace()64        if self._parse_complete(): raise self._parse_error("Line ended while parsing tags")65        if self.line[self._cursor] != IRCLine.TAGS_INDICATOR: return  # no tags here66        self._cursor += 167        if self._parse_complete(): raise self._parse_error("Line ended while parsing first tag")68        while self._parse_tag(): pass69    def _parse_tag(self):70        tag = {'key': None, 'vendor': None, 'value': '', 'client': False}71        rawvendor = None72        def check_set_key(s):73            if not re.fullmatch(IRCLine.TAG_VALID_KEY_PATTERN, s):74                raise self._parse_error("Invalid tag key")75            tag['key'] = s76        def unescape_val(s):77            ret = ''78            escape = False79            for ch in s:80                if escape:81                    escape = False82                    if ch in IRCLine.TAG_ESCAPE_CHARS:83                        ret += IRCLine.TAG_ESCAPE_CHARS[ch]84                    else:85                        ret += ch86                elif ch == '\\':87                    escape = True88                else:89                    ret += ch90            return ret91        if self.line[self._cursor] == IRCLine.TAG_CLIENT_PREFIX:92            tag['client'] = True93            self._cursor += 194            if self._parse_complete(): raise self._parse_error("Line ended while parsing tag key")95        somestr = ''96        while not self._parse_complete():97            curch = self.line[self._cursor]98            if tag['vendor'] is None and curch == '/':  # Vendor found99                if len(somestr) == 0: raise self._parse_error("Empty vendor found while parsing tag")100                rawvendor = somestr101                try:102                    tag['vendor'] = rawvendor.encode().decode('idna')103                except UnicodeError as uerr:104                    print(f'Failed parsing IDNA vendor: {uerr}')105                    tag['vendor'] = rawvendor  # TODO: Handle this properly106                somestr = ''107            elif tag['key'] is None and curch == '=':  # value found108                check_set_key(somestr)109                somestr = ''110            elif curch == ';' or curch == IRCLine.WHITESPACE:  # end of tag (or tags)111                if not tag['key']:112                    check_set_key(somestr)113                else:114                    tag['value'] = unescape_val(somestr)115                if rawvendor:116                    self.tags[f'{rawvendor}/{tag["key"]}'] = tag117                else:118                    self.tags[tag['key']] = tag119                self._cursor += 1120                if self._parse_complete(): raise self._parse_error("Line ended while parsing tag")121                return curch != IRCLine.WHITESPACE122            else:123                somestr += curch124            self._cursor += 1125        raise self._parse_error("Line ended while parsing tag")126    def _parse_source(self):127        source = ''128        while not self._parse_complete():129            curch = self.line[self._cursor]130            if curch == IRCLine.WHITESPACE:131                if len(source) == 0: raise self._parse_error("Message source is empty")132                self.source = parse_nuh(source)133                self.sourceraw = create_nuh(self.source)134                return135            source += curch136            self._cursor += 1137        raise self._parse_error("Line ended while parsing source")138    def _parse_verb(self):139        verb = ''140        while not self._parse_complete():141            curch = self.line[self._cursor]142            if curch == IRCLine.WHITESPACE:143                break144            verb += curch145            self._cursor += 1146        if not re.fullmatch(IRCLine.VALID_VERB_PATTERN, verb):147            raise self._parse_error("Invalid verb")148        self.verb = verb.upper()149    def _parse_params(self):150        param = ''151        while not self._parse_complete():152            curch = self.line[self._cursor]153            if curch == IRCLine.WHITESPACE:154                self.params.append(param)155                self._skip_whitespace()156                param = ''157                continue158            if curch == ':' and len(param) == 0:159                self.params.append(self.line[self._cursor + 1:])160                return161            param += curch162            self._cursor += 1163        if len(param) != 0 and not param.isspace():164            self.params.append(param)165    def parse(self):166        if len(self.line) == 0:167            raise self._parse_error("Line is empty")168        if self._do_tags: self._parse_tags()169        self._skip_whitespace()170        if self._parse_complete(): raise self._parse_error("Line ended while parsing source or verb")171        if self.line[self._cursor] == ':':172            self._cursor += 1173            if self._parse_complete(): raise self._parse_error("Line ended while beginning to parse source")174            self._parse_source()175            self._skip_whitespace()176            if self._parse_complete(): raise self._parse_error("Line ended after parsing source")177        self._parse_verb()178        self._skip_whitespace()179        if self._parse_complete(): return180        self._parse_params()181        self.line = None  # Allow this class to be used for 'sanitizing' lines182    def __str__(self):183        if self.line:184            return self.line185        ret = ''186        def tag_value_escape(s):187            esc = ''188            for ch in s:189                if ch in IRCLine.TAG_ESCAPE_CHARS_REV:190                    esc += '\\'191                    esc += IRCLine.TAG_ESCAPE_CHARS_REV[ch]192                else:193                    esc += ch194            return esc195        if self.tags:196            first = True197            for tagname in self.tags:198                if first:199                    first = False200                    ret += '@'201                else:202                    ret += ';'203                tag = self.tags[tagname]204                if tag['client']:205                    ret += IRCLine.TAG_CLIENT_PREFIX206                if tag['vendor']:207                    ret += tag['vendor'].encode('idna').decode('utf-8', errors='replace')208                    ret += '/'209                if not re.fullmatch(IRCLine.TAG_VALID_KEY_PATTERN, tag['key']):210                    raise ValueError("Invalid tag key")211                ret += tag['key']212                if tag['value']:213                    ret += '='214                    ret += tag_value_escape(tag['value'])215            ret += ' '216        if self.sourceraw:217            ret += ':'218            ret += self.sourceraw219            ret += ' '220        if not re.fullmatch(IRCLine.VALID_VERB_PATTERN, self.verb):221            raise ValueError("Invalid verb")222        ret += self.verb223        trailing = False224        for param in self.params:225            if trailing: raise ValueError("Unable to have arguments after trailing argument")226            ret += ' '227            if IRCLine.WHITESPACE in param or len(param) == 0:228                trailing = True229                ret += ':'230            ret += param231        self.line = ret232        return ret233PING_FREQ_SECS = 30234PING_TIMEOUT_SECS = 60235SEEK_NICK_CHECK_FREQ = 20236# TODO: does not handle casemapping AT ALL (assumes ascii)237class IRCBot:238    def __init__(self, nick='ircbot', ident='unknown', realname='realname'):239        self.nick = nick240        self.account = None241        self.ident = ident242        self.host = 'unknown'243        self.realname = realname244        self.registered = False245        self._writer = None246        self._shutdown = False247        self._last_ping = None248        self._ping_key = None249        self._seek_nick = nick250        self._seek_nick_check = time.time()251        self.pending_responses = {}252    async def handle_raw_line(self, recv):253        line = IRCLine(recv)254        try:255            line.parse()256            print(f" IN: {line}")257            try:258                method = getattr(self, "handle_verb_" + line.verb.lower())259                await method(line)260            except AttributeError:261                await self.handle_unknown_verb(line)262            except BaseException as ex:263                print(f"Error when handling line '{line}'")264                traceback.print_exception(type(ex), ex, ex.__traceback__)265        except ParseError as perr:266            print(f"Message parser error encountered, disconnecting: '{perr.desc}' (at index {perr.cursor})")267            print(f"Line in question: '{perr.line}'")268            await self.quit("Invalid message received")269    async def handle_unknown_verb(self, line):270        #print(f"Line has unknown verb: {line}")271        pass272    async def write_line(self, line: IRCLine):273        print(f"OUT: {line}")274        self._writer.write((str(line) + "\r\n").encode('utf-8', errors='replace'))275        await self._writer.drain()276    async def on_connect(self):277        await self.register()278    async def quit(self, message: str):279        await self.write_line(IRCLine(verb="QUIT", params=[message]))280    async def join(self, channel, key=None):281        params = [channel]282        if key is not None: params.append(key)283        await self.write_line(IRCLine(verb='JOIN', params=params))284    def shutdown(self):285        self._shutdown = True286    async def register(self):287        await self.write_line(IRCLine(verb="NICK", params=[self.nick]))288        await self.write_line(IRCLine(verb="USER", params=[self.ident, '0', '*', self.realname]))289    async def on_register(self):290        self.registered = True291    async def add_event(self, name: str, data: str):292        evt = asyncio.Event()293        if name in self.pending_responses:294            if data in self.pending_responses[name]:295                self.pending_responses[name][data].append(evt)296            else:297                self.pending_responses[name][data] = [evt]298        else:299            self.pending_responses[name] = {data: [evt]}300        await evt.wait()301    def event_complete(self, name: str, data: str):302        if name in self.pending_responses:303            if data in self.pending_responses[name]:304                for evt in self.pending_responses[name][data]:305                    evt.set()306                self.pending_responses[name][data].clear()307    async def handle_verb_001(self, line):308        await self.on_register()309    async def handle_verb_303(self, line):310        online = line.params[1]311        if self.nick != self._seek_nick and self._seek_nick not in online.split(' '):312            await self.write_line(IRCLine(verb='NICK', params=[self._seek_nick]))313    async def handle_verb_396(self, line):314        self.host = line.params[1]315    async def handle_verb_433(self, line):316        if not self.registered:317            self.nick += '_'318            await self.write_line(IRCLine(verb='NICK', params=[self.nick]))319    async def handle_verb_900(self, line):320        nuh = line.params[1]321        self.account = line.params[2]322        self.nick = nuh[:nuh.find('!')]323        self.ident = nuh[nuh.find('!')+1:nuh.find('@')]324        self.host = nuh[nuh.find('@')+1:]325    async def handle_verb_901(self, line):326        self.account = None327    async def handle_verb_nick(self, line):328        if self.nick == line.source['nick']:329            self.nick = line.params[0]330    async def handle_verb_quit(self, line):331        if self.nick != self._seek_nick and self._seek_nick == line.source['nick']:332            await self.write_line(IRCLine(verb='NICK', params=[self._seek_nick]))333    async def handle_verb_ping(self, line):334        await self.write_line(IRCLine(verb='PONG', params=line.params))335    async def handle_verb_pong(self, line):336        if self._ping_key is not None and line.params[len(line.params)-1] == self._ping_key:337            self._ping_key = None338    async def tick_client(self):339        if not self.registered: return340        now = time.time()341        if self._ping_key is None:342            if self._last_ping is None or now - self._last_ping > PING_FREQ_SECS:343                self._ping_key = ("00000000" + hex(random.randint(0, 0x7fffffff))[2:])[-8:]344                self._last_ping = now345                await self.write_line(IRCLine(verb='PING', params=[self._ping_key]))346        else:347            if self._last_ping is not None and now - self._last_ping > PING_TIMEOUT_SECS:348                await self.quit(f"No ping reply in {int(now - self._last_ping)} seconds")349                self.shutdown()350        if self.nick != self._seek_nick and now - self._seek_nick_check > SEEK_NICK_CHECK_FREQ:351            await self.write_line(IRCLine(verb='ISON', params=[self._seek_nick]))352            self._seek_nick_check = now353    async def connect(self, host, port, ssl=None):354        reader, self._writer = await asyncio.open_connection(host=host, port=port, ssl=ssl)355        try:356            await self.on_connect()357            partial = ''358            while not reader.at_eof() and not self._shutdown:359                await self.tick_client()360                try:361                    data = await asyncio.wait_for(reader.read(16384), 5.0)362                    lines = data.decode("utf-8", errors="replace").split("\r\n")363                    lines[0] = partial + lines[0]364                    partial = lines.pop(-1)365                    for line in lines:366                        if line == '' or line.isspace(): continue367                        await self.handle_raw_line(line)368                except asyncio.TimeoutError:369                    pass370        finally:371            self._writer.close()372# Doesn't send CAP END, and will not ever finish registering on CAP 302 servers373class CapAwareIRCBot(IRCBot):374    def __init__(self, req_caps=None, **kwargs):375        super().__init__(**kwargs)376        if req_caps is None:377            req_caps = ['message-tags', 'extended-join', 'account-tag', 'cap-notify', 'multi-prefix']378        self.req_caps = req_caps379        self.server_caps = {}380        self.our_caps = {}381    async def register(self):382        await self.cap_ls(wait=False)383        await super().register()384    async def handle_verb_cap(self, line):385        subcmd = line.params[1]386        if subcmd == 'LS' or subcmd == 'NEW':  # they are replying to our CAP query387            caplist = line.params[len(line.params)-1].split(' ')388            caps = {}389            for capspec in caplist:390                capsplt = capspec.split('=', maxsplit=1)391                val = ''392                if len(capsplt) == 2:393                    val = capsplt[1]394                caps[capsplt[0]] = val395            if subcmd == 'LS': await self.on_cap_ls(caps)396            else: await self.on_cap_new(caps)397            if len(line.params) == 3:398                self.event_complete('cap_ls', '*')399        elif subcmd == 'ACK':400            caps = line.params[2].split(' ')401            for cap in caps:402                if cap[0] == '-':403                    cap = cap[1:]404                    if cap in self.our_caps:405                        self.our_caps.pop(cap)406                else:407                    self.our_caps[cap] = True408                await self.on_cap_ack(cap)409        elif subcmd == 'NAK':410            caps = line.params[2].split(' ')411            for cap in caps:412                await self.on_cap_nak(cap)413        elif subcmd == 'DEL':414            caps = line.params[2].split(' ')415            await self.on_cap_del(caps)416    async def cap_ls(self, wait=True):417        self.server_caps.clear()418        await self.write_line(IRCLine(verb='CAP', params=['LS', '302']))419        if wait:420            await self.add_event('cap_ls', '*')421    async def cap_req(self, caps: [str], wait=True):422        # FIXME: prevent from going over length and from requesting empty list423        await self.write_line(IRCLine(verb='CAP', params=['REQ', ' '.join(caps)]))424        if wait:425            coros = []426            for cap in caps:427                coros.append(self.add_event('cap_req', cap))428            await asyncio.gather(*coros)429    async def cap_end(self, wait=True):430        await self.write_line(IRCLine(verb='CAP', params=['END']))431    async def on_cap_ls(self, caps: {}):432        to_req = []433        for cap in caps:434            self.server_caps[cap] = caps[cap]435            if cap in self.req_caps and cap not in self.our_caps:436                to_req.append(cap)437        if len(to_req) > 0:438            await self.cap_req(to_req, wait=False)439    async def on_cap_ack(self, name: str):440        self.event_complete('cap_req', name)441    async def on_cap_nak(self, name: str):442        self.event_complete('cap_req', name)443    async def on_cap_new(self, caps: {}):444        await self.on_cap_ls(caps)445    async def on_cap_del(self, caps: [str]):446        for cap in caps:447            if cap in self.our_caps:448                self.our_caps.pop(cap)449            if cap in self.server_caps:450                self.server_caps.pop(cap)451class SASLIRCBot(CapAwareIRCBot):452    def __init__(self, sasl_uname, sasl_pass, require_auth=False, **kwargs):453        super().__init__(**kwargs)454        self.req_caps.append("sasl")455        self.sasl_auth = (sasl_uname, sasl_pass)456        self.require_auth = require_auth457    async def on_cap_ack(self, name: str):458        await super(SASLIRCBot, self).on_cap_ack(name)459        if name == 'sasl':460            mechs = self.server_caps['sasl'].split(',')461            if 'PLAIN' not in mechs and self.require_auth:462                await self.quit("SASL PLAIN not supported")463                return464            await self.write_line(IRCLine(verb='AUTHENTICATE', params=['PLAIN']))465    async def handle_verb_authenticate(self, line):466        if line.params[0] == '+':467            namebytes = self.sasl_auth[0].encode('utf-8', errors='replace')468            passbytes = self.sasl_auth[1].encode('utf-8', errors='replace')469            auth = namebytes + b'\x00' + namebytes + b'\x00' + passbytes470            await self.write_line(IRCLine(verb='AUTHENTICATE', params=[base64.b64encode(auth).decode('utf-8', errors='replace')]))471    async def sasl_error(self):472        if self.require_auth:473            await self.quit("Not authenticated but require_auth is enabled")474        else:475            await self.cap_end()476    async def handle_verb_901(self, line):477        await super().handle_verb_901(line)478        await self.sasl_error()479    async def handle_verb_902(self, line):480        await self.sasl_error()481    async def handle_verb_903(self, line):482        await self.cap_end()483    async def handle_verb_904(self, line):484        await self.sasl_error()485    async def handle_verb_905(self, line):486        await self.sasl_error()487    async def handle_verb_906(self, line):488        await self.sasl_error()489    async def handle_verb_907(self, line):490        await self.sasl_error()491    async def handle_verb_908(self, line):...filter.py
Source:filter.py  
...83    def _do_paths(self, node):84        return self._match_values(node.hierarchy_paths, self.paths)85    def _do_descriptions(self, node):86        return all(self._match_values(node.hierarchy_descriptions, descs) for descs in self.descriptions)87    def _do_tags(self, node):88        return all(self._match_values(node.hierarchy_tags, tags) for tags in self.tags)89    def _do_properties(self, node):90        return all(self._match_key_values(node.hierarchy_properties, props) for props in self.properties)91    def _do_links(self, node):92        return all(self._match_values_lists(node.hierarchy_links, links) for links in self.links)93    @staticmethod94    def _apply_criteria(obj, *criteria):95        return all(criterion(obj) for criterion in criteria)96    def __call__(self, node):97        assert isinstance(node, (BaseTest, BaseSuite))98        return self._apply_criteria(99            node, self._do_paths, self._do_descriptions, self._do_tags, self._do_properties, self._do_links100        )101class TestFilter(BaseTreeNodeFilter):...printer_neid.py
Source:printer_neid.py  
...75        root = ET.Element('Lemma', props)76        aprops = {}77        aprops['declension'] = str(adj.declension)78        atag = ET.SubElement(root, 'adjective', aprops)79        def _do_tags(root, list, name, mut):80            for sub in list:81                subtag = ET.SubElement(root, name)82                subtag.text = mutate(mut, sub.value)83        _do_tags(atag, adj.sg_nom, 'sgNomMasc', Mutation.NoMut)84        _do_tags(atag, adj.sg_nom, 'sgNomFem', Mutation.Len1)85        _do_tags(atag, adj.sg_gen_masc, 'sgGenMasc', Mutation.Len1)86        _do_tags(atag, adj.sg_gen_fem, 'sgGenFem', Mutation.NoMut)87        _do_tags(atag, adj.pl_nom, 'plNom', Mutation.NoMut)88        _do_tags(atag, adj.pl_nom, 'plNomSlen', Mutation.Len1)89        _do_tags(atag, adj.pl_nom, 'plGenStrong', Mutation.NoMut)90        _do_tags(atag, adj.sg_nom, 'plGenWeak', Mutation.NoMut)91        for form in adj.get_compar_pres():92            subtag = ET.SubElement(atag, 'comparPres')93            subtag.text = form.value94        for form in adj.get_compar_past():95            subtag = ET.SubElement(atag, 'comparPast')96            subtag.text = form.value97        for form in adj.get_super_pres():98            subtag = ET.SubElement(atag, 'superPres')99            subtag.text = form.value100        for form in adj.get_super_past():101            subtag = ET.SubElement(atag, 'superPast')102            subtag.text = form.value103        for form in adj.abstract:104            subtag = ET.SubElement(atag, 'abstractNoun')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
