How to use _do_tags method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

irc.py

Source:irc.py Github

copy

Full Screen

1import asyncio2import time3import re4import base645import random6import traceback7class ParseError(BaseException):8 def __init__(self, desc, line, cursor):9 self.desc = desc10 self.line = line11 self.cursor = cursor12def parse_nuh(s):13 if not s: return None14 nuh = {}15 if '!' in s:16 nuh['nick'] = s[:s.find('!')]17 nuh['ident'] = s[s.find('!')+1:s.find('@')]18 nuh['host'] = s[s.find('@')+1:]19 else:20 nuh['nick'] = s21 nuh['ident'] = ''22 nuh['host'] = ''23 return nuh24def create_nuh(nuh):25 if nuh['ident'] != '':26 return f"{nuh['nick']}!{nuh['ident']}@{nuh['host']}"27 return nuh['nick']28class IRCLine:29 WHITESPACE = ' '30 TAGS_INDICATOR = '@'31 SOURCE_INDICATOR = ':'32 TAG_CLIENT_PREFIX = '+'33 TAG_SEPARATOR = ';'34 TAG_VALID_KEY_PATTERN = re.compile('[a-zA-Z\\d\\-]+')35 TAG_ESCAPE_CHARS = {':': ';', 's': ' ', '\\': '\\', 'r': '\r', 'n': '\n'}36 TAG_ESCAPE_CHARS_REV = {';': ':', ' ': 's', '\\': '\\', '\r': 'r', '\n': 'n'}37 VALID_VERB_PATTERN = re.compile('\\d{3}|[A-Za-z]+') # lowercase letters aren't technically allowed38 def __init__(self, line=None, do_tags=True, tags=None, source=None, verb=None, params=None):39 if tags is None:40 tags = {}41 if params is None:42 params = []43 self._cursor = 044 self.line = line45 self._do_tags = do_tags46 self.tags = tags47 self.source = parse_nuh(source)48 self.sourceraw = source49 self.verb = verb50 self.params = params51 def _parse_error(self, desc):52 return ParseError(desc, self.line, self._cursor)53 def _parse_complete(self):54 return self._cursor >= len(self.line)55 # technically there should not be a bunch of whitespace in a row, but IRC servers are weird and not necessarily56 # standards-compliant57 def _skip_whitespace(self):58 while not self._parse_complete():59 if self.line[self._cursor] != IRCLine.WHITESPACE:60 return61 self._cursor += 162 def _parse_tags(self):63 self._skip_whitespace()64 if self._parse_complete(): raise self._parse_error("Line ended while parsing tags")65 if self.line[self._cursor] != IRCLine.TAGS_INDICATOR: return # no tags here66 self._cursor += 167 if self._parse_complete(): raise self._parse_error("Line ended while parsing first tag")68 while self._parse_tag(): pass69 def _parse_tag(self):70 tag = {'key': None, 'vendor': None, 'value': '', 'client': False}71 rawvendor = None72 def check_set_key(s):73 if not re.fullmatch(IRCLine.TAG_VALID_KEY_PATTERN, s):74 raise self._parse_error("Invalid tag key")75 tag['key'] = s76 def unescape_val(s):77 ret = ''78 escape = False79 for ch in s:80 if escape:81 escape = False82 if ch in IRCLine.TAG_ESCAPE_CHARS:83 ret += IRCLine.TAG_ESCAPE_CHARS[ch]84 else:85 ret += ch86 elif ch == '\\':87 escape = True88 else:89 ret += ch90 return ret91 if self.line[self._cursor] == IRCLine.TAG_CLIENT_PREFIX:92 tag['client'] = True93 self._cursor += 194 if self._parse_complete(): raise self._parse_error("Line ended while parsing tag key")95 somestr = ''96 while not self._parse_complete():97 curch = self.line[self._cursor]98 if tag['vendor'] is None and curch == '/': # Vendor found99 if len(somestr) == 0: raise self._parse_error("Empty vendor found while parsing tag")100 rawvendor = somestr101 try:102 tag['vendor'] = rawvendor.encode().decode('idna')103 except UnicodeError as uerr:104 print(f'Failed parsing IDNA vendor: {uerr}')105 tag['vendor'] = rawvendor # TODO: Handle this properly106 somestr = ''107 elif tag['key'] is None and curch == '=': # value found108 check_set_key(somestr)109 somestr = ''110 elif curch == ';' or curch == IRCLine.WHITESPACE: # end of tag (or tags)111 if not tag['key']:112 check_set_key(somestr)113 else:114 tag['value'] = unescape_val(somestr)115 if rawvendor:116 self.tags[f'{rawvendor}/{tag["key"]}'] = tag117 else:118 self.tags[tag['key']] = tag119 self._cursor += 1120 if self._parse_complete(): raise self._parse_error("Line ended while parsing tag")121 return curch != IRCLine.WHITESPACE122 else:123 somestr += curch124 self._cursor += 1125 raise self._parse_error("Line ended while parsing tag")126 def _parse_source(self):127 source = ''128 while not self._parse_complete():129 curch = self.line[self._cursor]130 if curch == IRCLine.WHITESPACE:131 if len(source) == 0: raise self._parse_error("Message source is empty")132 self.source = parse_nuh(source)133 self.sourceraw = create_nuh(self.source)134 return135 source += curch136 self._cursor += 1137 raise self._parse_error("Line ended while parsing source")138 def _parse_verb(self):139 verb = ''140 while not self._parse_complete():141 curch = self.line[self._cursor]142 if curch == IRCLine.WHITESPACE:143 break144 verb += curch145 self._cursor += 1146 if not re.fullmatch(IRCLine.VALID_VERB_PATTERN, verb):147 raise self._parse_error("Invalid verb")148 self.verb = verb.upper()149 def _parse_params(self):150 param = ''151 while not self._parse_complete():152 curch = self.line[self._cursor]153 if curch == IRCLine.WHITESPACE:154 self.params.append(param)155 self._skip_whitespace()156 param = ''157 continue158 if curch == ':' and len(param) == 0:159 self.params.append(self.line[self._cursor + 1:])160 return161 param += curch162 self._cursor += 1163 if len(param) != 0 and not param.isspace():164 self.params.append(param)165 def parse(self):166 if len(self.line) == 0:167 raise self._parse_error("Line is empty")168 if self._do_tags: self._parse_tags()169 self._skip_whitespace()170 if self._parse_complete(): raise self._parse_error("Line ended while parsing source or verb")171 if self.line[self._cursor] == ':':172 self._cursor += 1173 if self._parse_complete(): raise self._parse_error("Line ended while beginning to parse source")174 self._parse_source()175 self._skip_whitespace()176 if self._parse_complete(): raise self._parse_error("Line ended after parsing source")177 self._parse_verb()178 self._skip_whitespace()179 if self._parse_complete(): return180 self._parse_params()181 self.line = None # Allow this class to be used for 'sanitizing' lines182 def __str__(self):183 if self.line:184 return self.line185 ret = ''186 def tag_value_escape(s):187 esc = ''188 for ch in s:189 if ch in IRCLine.TAG_ESCAPE_CHARS_REV:190 esc += '\\'191 esc += IRCLine.TAG_ESCAPE_CHARS_REV[ch]192 else:193 esc += ch194 return esc195 if self.tags:196 first = True197 for tagname in self.tags:198 if first:199 first = False200 ret += '@'201 else:202 ret += ';'203 tag = self.tags[tagname]204 if tag['client']:205 ret += IRCLine.TAG_CLIENT_PREFIX206 if tag['vendor']:207 ret += tag['vendor'].encode('idna').decode('utf-8', errors='replace')208 ret += '/'209 if not re.fullmatch(IRCLine.TAG_VALID_KEY_PATTERN, tag['key']):210 raise ValueError("Invalid tag key")211 ret += tag['key']212 if tag['value']:213 ret += '='214 ret += tag_value_escape(tag['value'])215 ret += ' '216 if self.sourceraw:217 ret += ':'218 ret += self.sourceraw219 ret += ' '220 if not re.fullmatch(IRCLine.VALID_VERB_PATTERN, self.verb):221 raise ValueError("Invalid verb")222 ret += self.verb223 trailing = False224 for param in self.params:225 if trailing: raise ValueError("Unable to have arguments after trailing argument")226 ret += ' '227 if IRCLine.WHITESPACE in param or len(param) == 0:228 trailing = True229 ret += ':'230 ret += param231 self.line = ret232 return ret233PING_FREQ_SECS = 30234PING_TIMEOUT_SECS = 60235SEEK_NICK_CHECK_FREQ = 20236# TODO: does not handle casemapping AT ALL (assumes ascii)237class IRCBot:238 def __init__(self, nick='ircbot', ident='unknown', realname='realname'):239 self.nick = nick240 self.account = None241 self.ident = ident242 self.host = 'unknown'243 self.realname = realname244 self.registered = False245 self._writer = None246 self._shutdown = False247 self._last_ping = None248 self._ping_key = None249 self._seek_nick = nick250 self._seek_nick_check = time.time()251 self.pending_responses = {}252 async def handle_raw_line(self, recv):253 line = IRCLine(recv)254 try:255 line.parse()256 print(f" IN: {line}")257 try:258 method = getattr(self, "handle_verb_" + line.verb.lower())259 await method(line)260 except AttributeError:261 await self.handle_unknown_verb(line)262 except BaseException as ex:263 print(f"Error when handling line '{line}'")264 traceback.print_exception(type(ex), ex, ex.__traceback__)265 except ParseError as perr:266 print(f"Message parser error encountered, disconnecting: '{perr.desc}' (at index {perr.cursor})")267 print(f"Line in question: '{perr.line}'")268 await self.quit("Invalid message received")269 async def handle_unknown_verb(self, line):270 #print(f"Line has unknown verb: {line}")271 pass272 async def write_line(self, line: IRCLine):273 print(f"OUT: {line}")274 self._writer.write((str(line) + "\r\n").encode('utf-8', errors='replace'))275 await self._writer.drain()276 async def on_connect(self):277 await self.register()278 async def quit(self, message: str):279 await self.write_line(IRCLine(verb="QUIT", params=[message]))280 async def join(self, channel, key=None):281 params = [channel]282 if key is not None: params.append(key)283 await self.write_line(IRCLine(verb='JOIN', params=params))284 def shutdown(self):285 self._shutdown = True286 async def register(self):287 await self.write_line(IRCLine(verb="NICK", params=[self.nick]))288 await self.write_line(IRCLine(verb="USER", params=[self.ident, '0', '*', self.realname]))289 async def on_register(self):290 self.registered = True291 async def add_event(self, name: str, data: str):292 evt = asyncio.Event()293 if name in self.pending_responses:294 if data in self.pending_responses[name]:295 self.pending_responses[name][data].append(evt)296 else:297 self.pending_responses[name][data] = [evt]298 else:299 self.pending_responses[name] = {data: [evt]}300 await evt.wait()301 def event_complete(self, name: str, data: str):302 if name in self.pending_responses:303 if data in self.pending_responses[name]:304 for evt in self.pending_responses[name][data]:305 evt.set()306 self.pending_responses[name][data].clear()307 async def handle_verb_001(self, line):308 await self.on_register()309 async def handle_verb_303(self, line):310 online = line.params[1]311 if self.nick != self._seek_nick and self._seek_nick not in online.split(' '):312 await self.write_line(IRCLine(verb='NICK', params=[self._seek_nick]))313 async def handle_verb_396(self, line):314 self.host = line.params[1]315 async def handle_verb_433(self, line):316 if not self.registered:317 self.nick += '_'318 await self.write_line(IRCLine(verb='NICK', params=[self.nick]))319 async def handle_verb_900(self, line):320 nuh = line.params[1]321 self.account = line.params[2]322 self.nick = nuh[:nuh.find('!')]323 self.ident = nuh[nuh.find('!')+1:nuh.find('@')]324 self.host = nuh[nuh.find('@')+1:]325 async def handle_verb_901(self, line):326 self.account = None327 async def handle_verb_nick(self, line):328 if self.nick == line.source['nick']:329 self.nick = line.params[0]330 async def handle_verb_quit(self, line):331 if self.nick != self._seek_nick and self._seek_nick == line.source['nick']:332 await self.write_line(IRCLine(verb='NICK', params=[self._seek_nick]))333 async def handle_verb_ping(self, line):334 await self.write_line(IRCLine(verb='PONG', params=line.params))335 async def handle_verb_pong(self, line):336 if self._ping_key is not None and line.params[len(line.params)-1] == self._ping_key:337 self._ping_key = None338 async def tick_client(self):339 if not self.registered: return340 now = time.time()341 if self._ping_key is None:342 if self._last_ping is None or now - self._last_ping > PING_FREQ_SECS:343 self._ping_key = ("00000000" + hex(random.randint(0, 0x7fffffff))[2:])[-8:]344 self._last_ping = now345 await self.write_line(IRCLine(verb='PING', params=[self._ping_key]))346 else:347 if self._last_ping is not None and now - self._last_ping > PING_TIMEOUT_SECS:348 await self.quit(f"No ping reply in {int(now - self._last_ping)} seconds")349 self.shutdown()350 if self.nick != self._seek_nick and now - self._seek_nick_check > SEEK_NICK_CHECK_FREQ:351 await self.write_line(IRCLine(verb='ISON', params=[self._seek_nick]))352 self._seek_nick_check = now353 async def connect(self, host, port, ssl=None):354 reader, self._writer = await asyncio.open_connection(host=host, port=port, ssl=ssl)355 try:356 await self.on_connect()357 partial = ''358 while not reader.at_eof() and not self._shutdown:359 await self.tick_client()360 try:361 data = await asyncio.wait_for(reader.read(16384), 5.0)362 lines = data.decode("utf-8", errors="replace").split("\r\n")363 lines[0] = partial + lines[0]364 partial = lines.pop(-1)365 for line in lines:366 if line == '' or line.isspace(): continue367 await self.handle_raw_line(line)368 except asyncio.TimeoutError:369 pass370 finally:371 self._writer.close()372# Doesn't send CAP END, and will not ever finish registering on CAP 302 servers373class CapAwareIRCBot(IRCBot):374 def __init__(self, req_caps=None, **kwargs):375 super().__init__(**kwargs)376 if req_caps is None:377 req_caps = ['message-tags', 'extended-join', 'account-tag', 'cap-notify', 'multi-prefix']378 self.req_caps = req_caps379 self.server_caps = {}380 self.our_caps = {}381 async def register(self):382 await self.cap_ls(wait=False)383 await super().register()384 async def handle_verb_cap(self, line):385 subcmd = line.params[1]386 if subcmd == 'LS' or subcmd == 'NEW': # they are replying to our CAP query387 caplist = line.params[len(line.params)-1].split(' ')388 caps = {}389 for capspec in caplist:390 capsplt = capspec.split('=', maxsplit=1)391 val = ''392 if len(capsplt) == 2:393 val = capsplt[1]394 caps[capsplt[0]] = val395 if subcmd == 'LS': await self.on_cap_ls(caps)396 else: await self.on_cap_new(caps)397 if len(line.params) == 3:398 self.event_complete('cap_ls', '*')399 elif subcmd == 'ACK':400 caps = line.params[2].split(' ')401 for cap in caps:402 if cap[0] == '-':403 cap = cap[1:]404 if cap in self.our_caps:405 self.our_caps.pop(cap)406 else:407 self.our_caps[cap] = True408 await self.on_cap_ack(cap)409 elif subcmd == 'NAK':410 caps = line.params[2].split(' ')411 for cap in caps:412 await self.on_cap_nak(cap)413 elif subcmd == 'DEL':414 caps = line.params[2].split(' ')415 await self.on_cap_del(caps)416 async def cap_ls(self, wait=True):417 self.server_caps.clear()418 await self.write_line(IRCLine(verb='CAP', params=['LS', '302']))419 if wait:420 await self.add_event('cap_ls', '*')421 async def cap_req(self, caps: [str], wait=True):422 # FIXME: prevent from going over length and from requesting empty list423 await self.write_line(IRCLine(verb='CAP', params=['REQ', ' '.join(caps)]))424 if wait:425 coros = []426 for cap in caps:427 coros.append(self.add_event('cap_req', cap))428 await asyncio.gather(*coros)429 async def cap_end(self, wait=True):430 await self.write_line(IRCLine(verb='CAP', params=['END']))431 async def on_cap_ls(self, caps: {}):432 to_req = []433 for cap in caps:434 self.server_caps[cap] = caps[cap]435 if cap in self.req_caps and cap not in self.our_caps:436 to_req.append(cap)437 if len(to_req) > 0:438 await self.cap_req(to_req, wait=False)439 async def on_cap_ack(self, name: str):440 self.event_complete('cap_req', name)441 async def on_cap_nak(self, name: str):442 self.event_complete('cap_req', name)443 async def on_cap_new(self, caps: {}):444 await self.on_cap_ls(caps)445 async def on_cap_del(self, caps: [str]):446 for cap in caps:447 if cap in self.our_caps:448 self.our_caps.pop(cap)449 if cap in self.server_caps:450 self.server_caps.pop(cap)451class SASLIRCBot(CapAwareIRCBot):452 def __init__(self, sasl_uname, sasl_pass, require_auth=False, **kwargs):453 super().__init__(**kwargs)454 self.req_caps.append("sasl")455 self.sasl_auth = (sasl_uname, sasl_pass)456 self.require_auth = require_auth457 async def on_cap_ack(self, name: str):458 await super(SASLIRCBot, self).on_cap_ack(name)459 if name == 'sasl':460 mechs = self.server_caps['sasl'].split(',')461 if 'PLAIN' not in mechs and self.require_auth:462 await self.quit("SASL PLAIN not supported")463 return464 await self.write_line(IRCLine(verb='AUTHENTICATE', params=['PLAIN']))465 async def handle_verb_authenticate(self, line):466 if line.params[0] == '+':467 namebytes = self.sasl_auth[0].encode('utf-8', errors='replace')468 passbytes = self.sasl_auth[1].encode('utf-8', errors='replace')469 auth = namebytes + b'\x00' + namebytes + b'\x00' + passbytes470 await self.write_line(IRCLine(verb='AUTHENTICATE', params=[base64.b64encode(auth).decode('utf-8', errors='replace')]))471 async def sasl_error(self):472 if self.require_auth:473 await self.quit("Not authenticated but require_auth is enabled")474 else:475 await self.cap_end()476 async def handle_verb_901(self, line):477 await super().handle_verb_901(line)478 await self.sasl_error()479 async def handle_verb_902(self, line):480 await self.sasl_error()481 async def handle_verb_903(self, line):482 await self.cap_end()483 async def handle_verb_904(self, line):484 await self.sasl_error()485 async def handle_verb_905(self, line):486 await self.sasl_error()487 async def handle_verb_906(self, line):488 await self.sasl_error()489 async def handle_verb_907(self, line):490 await self.sasl_error()491 async def handle_verb_908(self, line):...

Full Screen

Full Screen

filter.py

Source:filter.py Github

copy

Full Screen

...83 def _do_paths(self, node):84 return self._match_values(node.hierarchy_paths, self.paths)85 def _do_descriptions(self, node):86 return all(self._match_values(node.hierarchy_descriptions, descs) for descs in self.descriptions)87 def _do_tags(self, node):88 return all(self._match_values(node.hierarchy_tags, tags) for tags in self.tags)89 def _do_properties(self, node):90 return all(self._match_key_values(node.hierarchy_properties, props) for props in self.properties)91 def _do_links(self, node):92 return all(self._match_values_lists(node.hierarchy_links, links) for links in self.links)93 @staticmethod94 def _apply_criteria(obj, *criteria):95 return all(criterion(obj) for criterion in criteria)96 def __call__(self, node):97 assert isinstance(node, (BaseTest, BaseSuite))98 return self._apply_criteria(99 node, self._do_paths, self._do_descriptions, self._do_tags, self._do_properties, self._do_links100 )101class TestFilter(BaseTreeNodeFilter):...

Full Screen

Full Screen

printer_neid.py

Source:printer_neid.py Github

copy

Full Screen

...75 root = ET.Element('Lemma', props)76 aprops = {}77 aprops['declension'] = str(adj.declension)78 atag = ET.SubElement(root, 'adjective', aprops)79 def _do_tags(root, list, name, mut):80 for sub in list:81 subtag = ET.SubElement(root, name)82 subtag.text = mutate(mut, sub.value)83 _do_tags(atag, adj.sg_nom, 'sgNomMasc', Mutation.NoMut)84 _do_tags(atag, adj.sg_nom, 'sgNomFem', Mutation.Len1)85 _do_tags(atag, adj.sg_gen_masc, 'sgGenMasc', Mutation.Len1)86 _do_tags(atag, adj.sg_gen_fem, 'sgGenFem', Mutation.NoMut)87 _do_tags(atag, adj.pl_nom, 'plNom', Mutation.NoMut)88 _do_tags(atag, adj.pl_nom, 'plNomSlen', Mutation.Len1)89 _do_tags(atag, adj.pl_nom, 'plGenStrong', Mutation.NoMut)90 _do_tags(atag, adj.sg_nom, 'plGenWeak', Mutation.NoMut)91 for form in adj.get_compar_pres():92 subtag = ET.SubElement(atag, 'comparPres')93 subtag.text = form.value94 for form in adj.get_compar_past():95 subtag = ET.SubElement(atag, 'comparPast')96 subtag.text = form.value97 for form in adj.get_super_pres():98 subtag = ET.SubElement(atag, 'superPres')99 subtag.text = form.value100 for form in adj.get_super_past():101 subtag = ET.SubElement(atag, 'superPast')102 subtag.text = form.value103 for form in adj.abstract:104 subtag = ET.SubElement(atag, 'abstractNoun')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful