Best JavaScript code snippet using playwright-internal
node.py
Source:node.py
...105 print("%s" % cmd)106 self.pexpect = pexpect.spawn(cmd, timeout=4)107 # Add delay to ensure that the process is ready to receive commands.108 time.sleep(0.2)109 self._expect('spinel-cli >')110 self.debug(int(os.getenv('DEBUG', '0')))111 def _expect(self, pattern, timeout=-1, *args, **kwargs):112 """ Process simulator events until expected the pattern. """113 if timeout == -1:114 timeout = self.pexpect.timeout115 assert timeout > 0116 while timeout > 0:117 try:118 return self.pexpect.expect(pattern, 0.1, *args, **kwargs)119 except pexpect.TIMEOUT:120 timeout -= 0.1121 self.simulator.go(0)122 if timeout <= 0:123 raise124 def __init_soc(self, nodeid):125 """ Initialize a System-on-a-chip node connected via UART. """126 import fdpexpect127 serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)128 self.pexpect = fdpexpect.fdspawn(129 os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY)130 )131 def __del__(self):132 self.destroy()133 def destroy(self):134 if not self._initialized:135 return136 if (137 hasattr(self.pexpect, 'proc')138 and self.pexpect.proc.poll() is None139 or not hasattr(self.pexpect, 'proc')140 and self.pexpect.isalive()141 ):142 print("%d: exit" % self.nodeid)143 self.pexpect.send('exit\n')144 self.pexpect.expect(pexpect.EOF)145 self.pexpect.wait()146 self._initialized = False147 def read_cert_messages_in_commissioning_log(self, timeout=-1):148 """Get the log of the traffic after DTLS handshake.149 """150 format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"151 join_fin_req = format_str % br"JOIN_FIN\.req"152 join_fin_rsp = format_str % br"JOIN_FIN\.rsp"153 dummy_format_str = br"\[THCI\].*?type=%s.*?"154 join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"155 join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"156 pattern = (157 b"("158 + join_fin_req159 + b")|("160 + join_fin_rsp161 + b")|("162 + join_ent_ntf163 + b")|("164 + join_ent_rsp165 + b")"166 )167 messages = []168 # There are at most 4 cert messages both for joiner and commissioner169 for _ in range(0, 4):170 try:171 self._expect(pattern, timeout=timeout)172 log = self.pexpect.match.group(0)173 messages.append(self._extract_cert_message(log))174 except BaseException:175 break176 return messages177 def _extract_cert_message(self, log):178 res = re.search(br"direction=\w+", log)179 assert res180 direction = res.group(0).split(b'=')[1].strip()181 res = re.search(br"type=\S+", log)182 assert res183 type = res.group(0).split(b'=')[1].strip()184 payload = bytearray([])185 payload_len = 0186 if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:187 res = re.search(br"len=\d+", log)188 assert res189 payload_len = int(res.group(0).split(b'=')[1].strip())190 hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"191 while True:192 res = re.search(hex_pattern, log)193 if not res:194 break195 data = [196 int(hex, 16)197 for hex in res.group(0)[1:-1].split(b' ')198 if hex and hex != b'..'199 ]200 payload += bytearray(data)201 log = log[res.end() - 1:]202 assert len(payload) == payload_len203 return (direction, type, payload)204 def send_command(self, cmd, go=True):205 print("%d: %s" % (self.nodeid, cmd))206 self.pexpect.send(cmd + '\n')207 if go:208 self.simulator.go(0, nodeid=self.nodeid)209 sys.stdout.flush()210 def get_commands(self):211 self.send_command('?')212 self._expect('Commands:')213 commands = []214 while True:215 i = self._expect(['Done', r'(\S+)'])216 if i != 0:217 commands.append(self.pexpect.match.groups()[0])218 else:219 break220 return commands221 def set_mode(self, mode):222 cmd = 'mode %s' % mode223 self.send_command(cmd)224 self._expect('Done')225 def debug(self, level):226 # `debug` command will not trigger interaction with simulator227 self.send_command('debug %d' % level, go=False)228 def start(self):229 self.interface_up()230 self.thread_start()231 def stop(self):232 self.thread_stop()233 self.interface_down()234 def interface_up(self):235 self.send_command('ifconfig up')236 self._expect('Done')237 def interface_down(self):238 self.send_command('ifconfig down')239 self._expect('Done')240 def thread_start(self):241 self.send_command('thread start')242 self._expect('Done')243 def thread_stop(self):244 self.send_command('thread stop')245 self._expect('Done')246 def commissioner_start(self):247 cmd = 'commissioner start'248 self.send_command(cmd)249 self._expect('Done')250 def commissioner_add_joiner(self, addr, psk):251 cmd = 'commissioner joiner add %s %s' % (addr, psk)252 self.send_command(cmd)253 self._expect('Done')254 def joiner_start(self, pskd='', provisioning_url=''):255 cmd = 'joiner start %s %s' % (pskd, provisioning_url)256 self.send_command(cmd)257 self._expect('Done')258 def clear_whitelist(self):259 cmd = 'macfilter addr clear'260 self.send_command(cmd)261 self._expect('Done')262 def enable_whitelist(self):263 cmd = 'macfilter addr whitelist'264 self.send_command(cmd)265 self._expect('Done')266 def disable_whitelist(self):267 cmd = 'macfilter addr disable'268 self.send_command(cmd)269 self._expect('Done')270 def add_whitelist(self, addr, rssi=None):271 cmd = 'macfilter addr add %s' % addr272 if rssi is not None:273 cmd += ' %s' % rssi274 self.send_command(cmd)275 self._expect('Done')276 def remove_whitelist(self, addr):277 cmd = 'macfilter addr remove %s' % addr278 self.send_command(cmd)279 self._expect('Done')280 def get_addr16(self):281 self.send_command('rloc16')282 i = self._expect('([0-9a-fA-F]{4})')283 if i == 0:284 addr16 = int(self.pexpect.match.groups()[0], 16)285 self._expect('Done')286 return addr16287 def get_router_id(self):288 rloc16 = self.get_addr16()289 return rloc16 >> 10290 def get_addr64(self):291 self.send_command('extaddr')292 i = self._expect('([0-9a-fA-F]{16})')293 if i == 0:294 addr64 = self.pexpect.match.groups()[0].decode("utf-8")295 self._expect('Done')296 return addr64297 def get_eui64(self):298 self.send_command('eui64')299 i = self._expect('([0-9a-fA-F]{16})')300 if i == 0:301 addr64 = self.pexpect.match.groups()[0].decode("utf-8")302 self._expect('Done')303 return addr64304 def get_joiner_id(self):305 self.send_command('joiner id')306 i = self._expect('([0-9a-fA-F]{16})')307 if i == 0:308 addr = self.pexpect.match.groups()[0].decode("utf-8")309 self._expect('Done')310 return addr311 def get_channel(self):312 self.send_command('channel')313 i = self._expect(r'(\d+)\r?\n')314 if i == 0:315 channel = int(self.pexpect.match.groups()[0])316 self._expect('Done')317 return channel318 def set_channel(self, channel):319 cmd = 'channel %d' % channel320 self.send_command(cmd)321 self._expect('Done')322 def get_masterkey(self):323 self.send_command('masterkey')324 i = self._expect('([0-9a-fA-F]{32})')325 if i == 0:326 masterkey = self.pexpect.match.groups()[0].decode("utf-8")327 self._expect('Done')328 return masterkey329 def set_masterkey(self, masterkey):330 cmd = 'masterkey %s' % masterkey331 self.send_command(cmd)332 self._expect('Done')333 def get_key_sequence_counter(self):334 self.send_command('keysequence counter')335 i = self._expect(r'(\d+)\r?\n')336 if i == 0:337 key_sequence_counter = int(self.pexpect.match.groups()[0])338 self._expect('Done')339 return key_sequence_counter340 def set_key_sequence_counter(self, key_sequence_counter):341 cmd = 'keysequence counter %d' % key_sequence_counter342 self.send_command(cmd)343 self._expect('Done')344 def set_key_switch_guardtime(self, key_switch_guardtime):345 cmd = 'keysequence guardtime %d' % key_switch_guardtime346 self.send_command(cmd)347 self._expect('Done')348 def set_network_id_timeout(self, network_id_timeout):349 cmd = 'networkidtimeout %d' % network_id_timeout350 self.send_command(cmd)351 self._expect('Done')352 def get_network_name(self):353 self.send_command('networkname')354 while True:355 i = self._expect(['Done', r'(\S+)'])356 if i != 0:357 network_name = self.pexpect.match.groups()[0].decode('utf-8')358 else:359 break360 return network_name361 def set_network_name(self, network_name):362 cmd = 'networkname %s' % network_name363 self.send_command(cmd)364 self._expect('Done')365 def get_panid(self):366 self.send_command('panid')367 i = self._expect('([0-9a-fA-F]{4})')368 if i == 0:369 panid = int(self.pexpect.match.groups()[0], 16)370 self._expect('Done')371 return panid372 def set_panid(self, panid=config.PANID):373 cmd = 'panid %d' % panid374 self.send_command(cmd)375 self._expect('Done')376 def get_partition_id(self):377 self.send_command('leaderpartitionid')378 i = self._expect(r'(\d+)\r?\n')379 if i == 0:380 weight = self.pexpect.match.groups()[0]381 self._expect('Done')382 return weight383 def set_partition_id(self, partition_id):384 cmd = 'leaderpartitionid %d' % partition_id385 self.send_command(cmd)386 self._expect('Done')387 def set_router_upgrade_threshold(self, threshold):388 cmd = 'routerupgradethreshold %d' % threshold389 self.send_command(cmd)390 self._expect('Done')391 def set_router_downgrade_threshold(self, threshold):392 cmd = 'routerdowngradethreshold %d' % threshold393 self.send_command(cmd)394 self._expect('Done')395 def release_router_id(self, router_id):396 cmd = 'releaserouterid %d' % router_id397 self.send_command(cmd)398 self._expect('Done')399 def get_state(self):400 states = [r'\ndetached', r'\nchild', r'\nrouter', r'\nleader']401 self.send_command('state')402 match = self._expect(states)403 self._expect('Done')404 return states[match].strip(r'\n')405 def set_state(self, state):406 cmd = 'state %s' % state407 self.send_command(cmd)408 self._expect('Done')409 def get_timeout(self):410 self.send_command('childtimeout')411 i = self._expect(r'(\d+)\r?\n')412 if i == 0:413 timeout = self.pexpect.match.groups()[0]414 self._expect('Done')415 return timeout416 def set_timeout(self, timeout):417 cmd = 'childtimeout %d' % timeout418 self.send_command(cmd)419 self._expect('Done')420 def set_max_children(self, number):421 cmd = 'childmax %d' % number422 self.send_command(cmd)423 self._expect('Done')424 def get_weight(self):425 self.send_command('leaderweight')426 i = self._expect(r'(\d+)\r?\n')427 if i == 0:428 weight = self.pexpect.match.groups()[0]429 self._expect('Done')430 return weight431 def set_weight(self, weight):432 cmd = 'leaderweight %d' % weight433 self.send_command(cmd)434 self._expect('Done')435 def add_ipaddr(self, ipaddr):436 cmd = 'ipaddr add %s' % ipaddr437 self.send_command(cmd)438 self._expect('Done')439 def get_addrs(self):440 addrs = []441 self.send_command('ipaddr')442 while True:443 i = self._expect([r'(\S+(:\S*)+)\r?\n', 'Done'])444 if i == 0:445 addrs.append(self.pexpect.match.groups()[0].decode("utf-8"))446 elif i == 1:447 break448 return addrs449 def get_addr(self, prefix):450 network = ipaddress.ip_network(u'%s' % str(prefix))451 addrs = self.get_addrs()452 for addr in addrs:453 if isinstance(addr, bytearray):454 addr = bytes(addr)455 ipv6_address = ipaddress.ip_address(addr)456 if ipv6_address in network:457 return ipv6_address.exploded458 return None459 def get_addr_rloc(self):460 addrs = self.get_addrs()461 for addr in addrs:462 segs = addr.split(':')463 if (464 segs[4] == '0'465 and segs[5] == 'ff'466 and segs[6] == 'fe00'467 and segs[7] != 'fc00'468 ):469 return addr470 return None471 def get_addr_leader_aloc(self):472 addrs = self.get_addrs()473 for addr in addrs:474 segs = addr.split(':')475 if (476 segs[4] == '0'477 and segs[5] == 'ff'478 and segs[6] == 'fe00'479 and segs[7] == 'fc00'480 ):481 return addr482 return None483 def get_eidcaches(self):484 eidcaches = []485 self.send_command('eidcache')486 while True:487 i = self._expect([r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)\r?\n', 'Done'])488 if i == 0:489 eid = self.pexpect.match.groups()[0].decode("utf-8")490 rloc = self.pexpect.match.groups()[1].decode("utf-8")491 eidcaches.append((eid, rloc))492 elif i == 1:493 break494 return eidcaches495 def add_service(self, enterpriseNumber, serviceData, serverData):496 cmd = 'service add %s %s %s' % (497 enterpriseNumber,498 serviceData,499 serverData,500 )501 self.send_command(cmd)502 self._expect('Done')503 def remove_service(self, enterpriseNumber, serviceData):504 cmd = 'service remove %s %s' % (enterpriseNumber, serviceData)505 self.send_command(cmd)506 self._expect('Done')507 def __getLinkLocalAddress(self):508 for ip6Addr in self.get_addrs():509 if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):510 return ip6Addr511 return None512 def __getGlobalAddress(self):513 global_address = []514 for ip6Addr in self.get_addrs():515 if (516 (not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I))517 and (518 not re.match(519 config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I520 )521 )522 and (523 not re.match(524 config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I525 )526 )527 ):528 global_address.append(ip6Addr)529 return global_address530 def __getRloc(self):531 for ip6Addr in self.get_addrs():532 if (533 re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)534 and re.match(535 config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I536 )537 and not (538 re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)539 )540 ):541 return ip6Addr542 return None543 def __getAloc(self):544 aloc = []545 for ip6Addr in self.get_addrs():546 if (547 re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)548 and re.match(549 config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I550 )551 and re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)552 ):553 aloc.append(ip6Addr)554 return aloc555 def __getMleid(self):556 for ip6Addr in self.get_addrs():557 if re.match(558 config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I559 ) and not (560 re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)561 ):562 return ip6Addr563 return None564 def get_ip6_address(self, address_type):565 """Get specific type of IPv6 address configured on thread device.566 Args:567 address_type: the config.ADDRESS_TYPE type of IPv6 address.568 Returns:569 IPv6 address string.570 """571 if address_type == config.ADDRESS_TYPE.LINK_LOCAL:572 return self.__getLinkLocalAddress()573 elif address_type == config.ADDRESS_TYPE.GLOBAL:574 return self.__getGlobalAddress()575 elif address_type == config.ADDRESS_TYPE.RLOC:576 return self.__getRloc()577 elif address_type == config.ADDRESS_TYPE.ALOC:578 return self.__getAloc()579 elif address_type == config.ADDRESS_TYPE.ML_EID:580 return self.__getMleid()581 else:582 return None583 return None584 def get_context_reuse_delay(self):585 self.send_command('contextreusedelay')586 i = self._expect(r'(\d+)\r?\n')587 if i == 0:588 timeout = self.pexpect.match.groups()[0]589 self._expect('Done')590 return timeout591 def set_context_reuse_delay(self, delay):592 cmd = 'contextreusedelay %d' % delay593 self.send_command(cmd)594 self._expect('Done')595 def add_prefix(self, prefix, flags, prf='med'):596 cmd = 'prefix add %s %s %s' % (prefix, flags, prf)597 self.send_command(cmd)598 self._expect('Done')599 def remove_prefix(self, prefix):600 cmd = 'prefix remove %s' % prefix601 self.send_command(cmd)602 self._expect('Done')603 def add_route(self, prefix, prf='med'):604 cmd = 'route add %s %s' % (prefix, prf)605 self.send_command(cmd)606 self._expect('Done')607 def remove_route(self, prefix):608 cmd = 'route remove %s' % prefix609 self.send_command(cmd)610 self._expect('Done')611 def register_netdata(self):612 self.send_command('netdataregister')613 self._expect('Done')614 def energy_scan(self, mask, count, period, scan_duration, ipaddr):615 cmd = 'commissioner energy %d %d %d %d %s' % (616 mask,617 count,618 period,619 scan_duration,620 ipaddr,621 )622 self.send_command(cmd)623 if isinstance(self.simulator, simulator.VirtualTime):624 self.simulator.go(8)625 timeout = 1626 else:627 timeout = 8628 self._expect('Energy:', timeout=timeout)629 def panid_query(self, panid, mask, ipaddr):630 cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr)631 self.send_command(cmd)632 if isinstance(self.simulator, simulator.VirtualTime):633 self.simulator.go(8)634 timeout = 1635 else:636 timeout = 8637 self._expect('Conflict:', timeout=timeout)638 def scan(self):639 self.send_command('scan')640 results = []641 while True:642 i = self._expect(643 [644 r'\|\s(\S+)\s+\|\s(\S+)\s+\|\s([0-9a-fA-F]{4})\s\|\s([0-9a-fA-F]{16})\s\|\s(\d+)\r?\n',645 'Done',646 ]647 )648 if i == 0:649 results.append(self.pexpect.match.groups())650 else:651 break652 return results653 def ping(self, ipaddr, num_responses=1, size=None, timeout=5):654 cmd = 'ping %s' % ipaddr655 if size is not None:656 cmd += ' %d' % size657 self.send_command(cmd)658 if isinstance(self.simulator, simulator.VirtualTime):659 self.simulator.go(timeout)660 result = True661 try:662 responders = {}663 while len(responders) < num_responses:664 i = self._expect([r'from (\S+):'])665 if i == 0:666 responders[self.pexpect.match.groups()[0]] = 1667 self._expect('\n')668 except (pexpect.TIMEOUT, socket.timeout):669 result = False670 if isinstance(self.simulator, simulator.VirtualTime):671 self.simulator.sync_devices()672 return result673 def reset(self):674 self.send_command('reset')675 time.sleep(0.1)676 def set_router_selection_jitter(self, jitter):677 cmd = 'routerselectionjitter %d' % jitter678 self.send_command(cmd)679 self._expect('Done')680 def set_active_dataset(681 self,682 timestamp,683 panid=None,684 channel=None,685 channel_mask=None,686 master_key=None,687 ):688 self.send_command('dataset clear')689 self._expect('Done')690 cmd = 'dataset activetimestamp %d' % timestamp691 self.send_command(cmd)692 self._expect('Done')693 if panid is not None:694 cmd = 'dataset panid %d' % panid695 self.send_command(cmd)696 self._expect('Done')697 if channel is not None:698 cmd = 'dataset channel %d' % channel699 self.send_command(cmd)700 self._expect('Done')701 if channel_mask is not None:702 cmd = 'dataset channelmask %d' % channel_mask703 self.send_command(cmd)704 self._expect('Done')705 if master_key is not None:706 cmd = 'dataset masterkey %s' % master_key707 self.send_command(cmd)708 self._expect('Done')709 self.send_command('dataset commit active')710 self._expect('Done')711 def set_pending_dataset(712 self, pendingtimestamp, activetimestamp, panid=None, channel=None713 ):714 self.send_command('dataset clear')715 self._expect('Done')716 cmd = 'dataset pendingtimestamp %d' % pendingtimestamp717 self.send_command(cmd)718 self._expect('Done')719 cmd = 'dataset activetimestamp %d' % activetimestamp720 self.send_command(cmd)721 self._expect('Done')722 if panid is not None:723 cmd = 'dataset panid %d' % panid724 self.send_command(cmd)725 self._expect('Done')726 if channel is not None:727 cmd = 'dataset channel %d' % channel728 self.send_command(cmd)729 self._expect('Done')730 self.send_command('dataset commit pending')731 self._expect('Done')732 def announce_begin(self, mask, count, period, ipaddr):733 cmd = 'commissioner announce %d %d %d %s' % (734 mask,735 count,736 period,737 ipaddr,738 )739 self.send_command(cmd)740 self._expect('Done')741 def send_mgmt_active_set(742 self,743 active_timestamp=None,744 channel=None,745 channel_mask=None,746 extended_panid=None,747 panid=None,748 master_key=None,749 mesh_local=None,750 network_name=None,751 binary=None,752 ):753 cmd = 'dataset mgmtsetcommand active '754 if active_timestamp is not None:755 cmd += 'activetimestamp %d ' % active_timestamp756 if channel is not None:757 cmd += 'channel %d ' % channel758 if channel_mask is not None:759 cmd += 'channelmask %d ' % channel_mask760 if extended_panid is not None:761 cmd += 'extpanid %s ' % extended_panid762 if panid is not None:763 cmd += 'panid %d ' % panid764 if master_key is not None:765 cmd += 'masterkey %s ' % master_key766 if mesh_local is not None:767 cmd += 'localprefix %s ' % mesh_local768 if network_name is not None:769 cmd += 'networkname %s ' % network_name770 if binary is not None:771 cmd += 'binary %s ' % binary772 self.send_command(cmd)773 self._expect('Done')774 def send_mgmt_pending_set(775 self,776 pending_timestamp=None,777 active_timestamp=None,778 delay_timer=None,779 channel=None,780 panid=None,781 master_key=None,782 mesh_local=None,783 network_name=None,784 ):785 cmd = 'dataset mgmtsetcommand pending '786 if pending_timestamp is not None:787 cmd += 'pendingtimestamp %d ' % pending_timestamp788 if active_timestamp is not None:789 cmd += 'activetimestamp %d ' % active_timestamp790 if delay_timer is not None:791 cmd += 'delaytimer %d ' % delay_timer792 if channel is not None:793 cmd += 'channel %d ' % channel794 if panid is not None:795 cmd += 'panid %d ' % panid796 if master_key is not None:797 cmd += 'masterkey %s ' % master_key798 if mesh_local is not None:799 cmd += 'localprefix %s ' % mesh_local800 if network_name is not None:801 cmd += 'networkname %s ' % network_name802 self.send_command(cmd)803 self._expect('Done')804 def coaps_start_psk(self, psk, pskIdentity):805 cmd = 'coaps psk %s %s' % (psk, pskIdentity)806 self.send_command(cmd)807 self._expect('Done')808 cmd = 'coaps start'809 self.send_command(cmd)810 self._expect('Done')811 def coaps_start_x509(self):812 cmd = 'coaps x509'813 self.send_command(cmd)814 self._expect('Done')815 cmd = 'coaps start'816 self.send_command(cmd)817 self._expect('Done')818 def coaps_set_resource_path(self, path):819 cmd = 'coaps resource %s' % path820 self.send_command(cmd)821 self._expect('Done')822 def coaps_stop(self):823 cmd = 'coaps stop'824 self.send_command(cmd)825 if isinstance(self.simulator, simulator.VirtualTime):826 self.simulator.go(5)827 timeout = 1828 else:829 timeout = 5830 self._expect('Done', timeout=timeout)831 def coaps_connect(self, ipaddr):832 cmd = 'coaps connect %s' % ipaddr833 self.send_command(cmd)834 if isinstance(self.simulator, simulator.VirtualTime):835 self.simulator.go(5)836 timeout = 1837 else:838 timeout = 5839 self._expect('coaps connected', timeout=timeout)840 def coaps_disconnect(self):841 cmd = 'coaps disconnect'842 self.send_command(cmd)843 self._expect('Done')844 self.simulator.go(5)845 def coaps_get(self):846 cmd = 'coaps get test'847 self.send_command(cmd)848 if isinstance(self.simulator, simulator.VirtualTime):849 self.simulator.go(5)850 timeout = 1851 else:852 timeout = 5853 self._expect('coaps response', timeout=timeout)854 def commissioner_mgmtset(self, tlvs_binary):855 cmd = 'commissioner mgmtset binary %s' % tlvs_binary856 self.send_command(cmd)857 self._expect('Done')858 def bytes_to_hex_str(self, src):859 return ''.join(format(x, '02x') for x in src)860 def commissioner_mgmtset_with_tlvs(self, tlvs):861 payload = bytearray()862 for tlv in tlvs:863 payload += tlv.to_hex()864 self.commissioner_mgmtset(self.bytes_to_hex_str(payload))865 def udp_start(self, local_ipaddr, local_port):866 cmd = 'udp open'867 self.send_command(cmd)868 self._expect('Done')869 cmd = 'udp bind %s %s' % (local_ipaddr, local_port)870 self.send_command(cmd)871 self._expect('Done')872 def udp_stop(self):873 cmd = 'udp close'874 self.send_command(cmd)875 self._expect('Done')876 def udp_send(self, bytes, ipaddr, port, success=True):877 cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes)878 self.send_command(cmd)879 if success:880 self._expect('Done')881 else:882 self._expect('Error')883 def udp_check_rx(self, bytes_should_rx):884 self._expect('%d bytes' % bytes_should_rx)885 def router_list(self):886 cmd = 'router list'887 self.send_command(cmd)888 self._expect([r'(\d+)((\s\d+)*)'])889 g = self.pexpect.match.groups()890 router_list = g[0] + ' ' + g[1]891 router_list = [int(x) for x in router_list.split()]892 self._expect('Done')893 return router_list894 def router_table(self):895 cmd = 'router table'896 self.send_command(cmd)897 self._expect(r'(.*)Done')898 g = self.pexpect.match.groups()899 output = g[0]900 lines = output.strip().split('\n')901 lines = [l.strip() for l in lines]902 router_table = {}903 for i, line in enumerate(lines):904 if not line.startswith('|') or not line.endswith('|'):905 if i not in (0, 2):906 # should not happen907 print("unexpected line %d: %s" % (i, line))908 continue909 line = line[1:][:-1]910 line = [x.strip() for x in line.split('|')]911 if len(line) != 8:...
shtoken.py
Source:shtoken.py
1import unittest2from web3 import Web33from time import sleep4import settings as sts5import config6class SHToken(unittest.TestCase):7 @classmethod8 def setUpClass(cls):9 # åå§åæ°æ® sht-class, w3, sht10 while True:11 cls.w3 = Web3(Web3.HTTPProvider(sts.SIBBAY_SHT_NODE_HTTP, {"timeout": 30}))12 if cls.w3.isConnected() == False:13 print("node is not connected, wait " + str(5) + " second")14 sleep(5)15 else:16 print("connect to node by http: " + sts.SIBBAY_SHT_NODE_HTTP)17 break18 cls.sht = cls.w3.eth.contract(address=Web3.toChecksumAddress(sts.SIBBAY_SHT_ADDRESS), abi=sts.SIBBAY_SHT_ABI)19# @classmethod20# def tearDownClass(cls):21# cls.w3.providers[0]._socket.sock.close()22 # çå¾
确认23 def wait_tx_confirm(self, tx_hash):24 while True:25 sleep(config.waitting_time)26 logs = self.w3.eth.getTransactionReceipt(tx_hash)27 if logs is None:28 continue29 break30 return logs31 def create_account(self, _pwd):32 # å建账æ·33 ret = self.w3.personal.newAccount(_pwd)34 return ret35 # åéether36 def send_ether(self, _from, _to, _value, _pwd):37 _from = Web3.toChecksumAddress(_from)38 _to = Web3.toChecksumAddress(_to)39 # è®°å½è´¦æ·_toä½é¢40 balance_old = self.w3.eth.getBalance(_to)41 # 解é_fromè´¦æ·ï¼å¹¶è½¬è´¦42 self.w3.personal.unlockAccount(_from, _pwd)43 tx_hash = self.w3.eth.sendTransaction({"from": _from, "to": _to, "value": _value, "gas": config.gas, "gasPrice": config.gas_price})44 # çå¾
确认45 ret = self.wait_tx_confirm(tx_hash)46 print("send_ether gas used:", ret["gasUsed"])47 # æ¥çè´¦æ·_toä½é¢48 balance_new = self.w3.eth.getBalance(_to)49 self.assertEqual(balance_new - balance_old, _value)50 # åétoken51 # _from åéæ¹52 # _to æ¥æ¶æ¹53 # _value tokençæ°é54 # _pwd _fromç解éå¯ç 55 # _expect è´¦æ·_to tokençååå¼56 def transfer(self, _from, _to, _value, _pwd, _expect):57 _from = Web3.toChecksumAddress(_from)58 _to = Web3.toChecksumAddress(_to)59 # è®°å½è´¦æ·_fromçä½é¢60 balance_old_from = self.sht.functions.balanceOf(_from).call()61 # è®°å½è´¦æ·_toçä½é¢62 balance_old = self.sht.functions.balanceOf(_to).call()63 # 解é_fromè´¦æ·ï¼å¹¶åé_vaue个tokenç»è´¦æ·_to64 self.w3.personal.unlockAccount(_from, _pwd)65 tx_hash = self.sht.functions.transfer(_to, _value).transact({"from": _from, "gas": config.gas, "gasPrice": config.gas_price})66 # çå¾
确认67 ret = self.wait_tx_confirm(tx_hash)68 print("transfer gas used:", ret["gasUsed"])69 # æ¥çè´¦æ·_fromçä½é¢70 balance_new_from = self.sht.functions.balanceOf(_from).call()71 # æ¥çè´¦æ·_toçä½é¢72 balance_new = self.sht.functions.balanceOf(_to).call()73 self.assertEqual(balance_new - balance_old, _expect)74 self.assertEqual(balance_old_from - balance_new_from, _expect)75 def transfer_from(self, _spender, _from, _to, _value, _pwd, _expect):76 _spender = Web3.toChecksumAddress(_spender)77 _from = Web3.toChecksumAddress(_from)78 _to = Web3.toChecksumAddress(_to)79 # è®°å½è´¦æ·_fromçä½é¢80 balance_old_from = self.sht.functions.balanceOf(_from).call()81 # è®°å½è´¦æ·_toçä½é¢82 balance_old = self.sht.functions.balanceOf(_to).call()83 # è®°å½è´¦æ· _spender 代çä½é¢84 approve_balance_old = self.sht.functions.allowance(_from, _spender).call()85 # 解é_spenderè´¦æ·ï¼å¹¶åé_vaue个tokenç»è´¦æ·_to86 self.w3.personal.unlockAccount(_spender, _pwd)87 tx_hash = self.sht.functions.transferFrom(_from, _to, _value).transact({"from": _spender, "gas": config.gas, "gasPrice": config.gas_price})88 # çå¾
确认89 ret = self.wait_tx_confirm(tx_hash)90 print("transfer_from gas used:", ret["gasUsed"])91 # æ¥çè´¦æ·_fromçä½é¢92 balance_new_from = self.sht.functions.balanceOf(_from).call()93 # æ¥çè´¦æ·_toçä½é¢94 balance_new = self.sht.functions.balanceOf(_to).call()95 # æ¥çè´¦æ· _spender 代çä½é¢96 approve_balance_new = self.sht.functions.allowance(_from, _spender).call()97 self.assertEqual(balance_new - balance_old, _expect)98 self.assertEqual(balance_old_from - balance_new_from, _expect)99 self.assertEqual(approve_balance_old - approve_balance_new , _expect)100 def approve(self, _owner, _spender, _value, _pwd, _expect):101 # 转æ¢å°å102 _owner = Web3.toChecksumAddress(_owner)103 _spender = Web3.toChecksumAddress(_spender)104 # æ¥è¯¢ä»£çä½é¢105 ret = self.sht.functions.allowance(_owner, _spender).call()106 if ret == _value:107 return108 # 解é_ownerè´¦æ·ï¼å¹¶è®¾ç½®ä»£çä½é¢109 self.w3.personal.unlockAccount(_owner, _pwd)110 tx_hash = self.sht.functions.approve(_spender, _value).transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})111 # çå¾
确认112 ret = self.wait_tx_confirm(tx_hash)113 print("approve gas used:", ret["gasUsed"])114 # æ¥è¯¢ä»£çä½é¢115 ret = self.sht.functions.allowance(_owner, _spender).call()116 self.assertEqual(ret, _expect)117 def increase_approval(self, _owner, _spender, _value, _pwd, _expect):118 _owner = Web3.toChecksumAddress(_owner)119 _spender = Web3.toChecksumAddress(_spender)120 # è®°å½ä»£çä½é¢121 balance_old = self.sht.functions.allowance(_owner, _spender).call()122 # 解é_ownerè´¦æ·ï¼å¹¶è®¾ç½®ä»£çä½é¢123 self.w3.personal.unlockAccount(_owner, _pwd)124 tx_hash = self.sht.functions.increaseApproval(_spender, _value).transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})125 # çå¾
确认126 ret = self.wait_tx_confirm(tx_hash)127 print("increase_approval gas used:", ret["gasUsed"])128 # æ¥è¯¢ä»£çä½é¢129 balance_new = self.sht.functions.allowance(_owner, _spender).call()130 self.assertEqual(balance_new - balance_old, _expect)131 def decrease_approval(self, _owner, _spender, _value, _pwd, _expect):132 _owner = Web3.toChecksumAddress(_owner)133 _spender = Web3.toChecksumAddress(_spender)134 # è®°å½ä»£çä½é¢135 balance_old = self.sht.functions.allowance(_owner, _spender).call()136 # 解é_ownerè´¦æ·ï¼å¹¶è®¾ç½®ä»£çä½é¢137 self.w3.personal.unlockAccount(_owner, _pwd)138 tx_hash = self.sht.functions.decreaseApproval(_spender, _value).transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})139 # çå¾
确认140 ret = self.wait_tx_confirm(tx_hash)141 print("decrease_approval gas used:", ret["gasUsed"])142 # æ¥è¯¢ä»£çä½é¢143 balance_new = self.sht.functions.allowance(_owner, _spender).call()144 self.assertEqual(balance_old - balance_new, _expect)145 def batch_transfer(self, _from, _receivers, _values, _pwd, _expects):146 _from = Web3.toChecksumAddress(_from)147 # è®°å½è´¦æ·_fromçä½é¢148 balance_old_from = self.sht.functions.balanceOf(_from).call()149 # 转æ¢å°åï¼å¹¶è®°å½ä½é¢150 balances_old = []151 for i in range(len(_receivers)):152 _receivers[i] = Web3.toChecksumAddress(_receivers[i])153 balances_old.append(self.sht.functions.balanceOf(_receivers[i]).call())154 # 解é_fromè´¦æ·ï¼å¹¶æ¹éåé155 self.w3.personal.unlockAccount(_from, _pwd)156 tx_hash = self.sht.functions.batchTransfer(_receivers, _values).transact({"from": _from, "gas": config.gas, "gasPrice": config.gas_price})157 # çå¾
确认158 ret = self.wait_tx_confirm(tx_hash)159 print("batch_transfer with", str(len(_receivers)), "receivers gas used:", ret["gasUsed"])160 # æ¥çè´¦æ·_fromçä½é¢161 balance_new_from = self.sht.functions.balanceOf(_from).call()162 # æ¥è¯¢ä½é¢ï¼å¹¶éªè¯163 for i in range(len(_receivers)):164 ret = self.sht.functions.balanceOf(_receivers[i]).call()165 self.assertEqual(ret - balances_old[i], _expects[i])166 # 计ç®expectï¼éªè¯_fromä½é¢167 expect = 0168 for i in range(len(_expects)):169 expect = expect + _expects[i]170 self.assertEqual(balance_old_from - balance_new_from, expect)171 def batch_transfer_from(self, _spender, _from, _receivers, _values, _pwd, _expects):172 _spender = Web3.toChecksumAddress(_spender)173 _from = Web3.toChecksumAddress(_from)174 # è®°å½è´¦æ·_fromçä½é¢175 balance_old_from = self.sht.functions.balanceOf(_from).call()176 # 转æ¢å°åï¼å¹¶è®°å½ä½é¢177 balances_old = []178 for i in range(len(_receivers)):179 _receivers[i] = Web3.toChecksumAddress(_receivers[i])180 balances_old.append(self.sht.functions.balanceOf(_receivers[i]).call())181 # 解é_fromè´¦æ·ï¼å¹¶æ¹éåé182 self.w3.personal.unlockAccount(_spender, _pwd)183 tx_hash = self.sht.functions.batchTransferFrom(_from, _receivers, _values).transact({"from": _spender, "gas": config.gas, "gasPrice": config.gas_price})184 # çå¾
确认185 ret = self.wait_tx_confirm(tx_hash)186 print("batch_transfer_from with", str(len(_receivers)), "receivers gas used:", ret["gasUsed"])187 # æ¥çè´¦æ·_fromçä½é¢188 balance_new_from = self.sht.functions.balanceOf(_from).call()189 # æ¥è¯¢ä½é¢ï¼å¹¶éªè¯190 for i in range(len(_receivers)):191 ret = self.sht.functions.balanceOf(_receivers[i]).call()192 self.assertEqual(ret - balances_old[i], _expects[i])193 # 计ç®expectï¼éªè¯_fromä½é¢194 expect = 0195 for i in range(len(_expects)):196 expect = expect + _expects[i]197 self.assertEqual(balance_old_from - balance_new_from, expect)198 def transfer_by_date(self, _from, _to, _values, _dates, _pwd, _expect):199 _from = Web3.toChecksumAddress(_from)200 _to = Web3.toChecksumAddress(_to)201 # è®°å½è´¦æ·_fromçä½é¢202 balance_old_from = self.sht.functions.totalBalanceOf(_from).call()203 # è®°å½ä½é¢204 balance_old = self.sht.functions.totalBalanceOf(_to).call()205 # 解é_fromè´¦æ·ï¼å¹¶æ¹éåé206 self.w3.personal.unlockAccount(_from, _pwd)207 tx_hash = self.sht.functions.transferByDate(_to, _values, _dates).transact({"from": _from, "gas": config.gas, "gasPrice": config.gas_price})208 # çå¾
确认209 ret = self.wait_tx_confirm(tx_hash)210 print("transfer_by_date with", str(len(_dates)), "dates gas used:", ret["gasUsed"])211 # æ¥çè´¦æ·_fromçä½é¢,并éªè¯212 balance_new_from = self.sht.functions.totalBalanceOf(_from).call()213 self.assertEqual(balance_old_from - balance_new_from, _expect)214 # æ¥è¯¢ä½é¢ï¼å¹¶éªè¯215 balance_new = self.sht.functions.totalBalanceOf(_to).call()216 self.assertEqual(balance_new - balance_old, _expect)217 def transfer_from_by_date(self, _spender, _from, _to, _values, _dates, _pwd, _expect):218 _spender = Web3.toChecksumAddress(_spender)219 _from = Web3.toChecksumAddress(_from)220 _to = Web3.toChecksumAddress(_to)221 # è®°å½è´¦æ·_fromçä½é¢,并éªè¯222 balance_old_from = self.sht.functions.totalBalanceOf(_from).call()223 # è®°å½ä½é¢224 balance_old = self.sht.functions.totalBalanceOf(_to).call()225 # 解é_fromè´¦æ·ï¼å¹¶æ¹éåé226 self.w3.personal.unlockAccount(_spender, _pwd)227 tx_hash = self.sht.functions.transferFromByDate(_from, _to, _values, _dates).transact({"from": _spender, "gas": config.gas, "gasPrice": config.gas_price})228 # çå¾
确认229 ret = self.wait_tx_confirm(tx_hash)230 print("transfer_from_by_date with", str(len(_dates)), "dates gas used:", ret["gasUsed"])231 # æ¥çè´¦æ·_fromçä½é¢,并éªè¯232 balance_new_from = self.sht.functions.totalBalanceOf(_from).call()233 self.assertEqual(balance_old_from - balance_new_from, _expect)234 # æ¥è¯¢ä½é¢ï¼å¹¶éªè¯235 balance_new = self.sht.functions.totalBalanceOf(_to).call()236 self.assertEqual(balance_new - balance_old, _expect)237 def add_token_to_fund(self, _owner, _values, _pwd, _expect):238 _owner = Web3.toChecksumAddress(_owner)239 # è®°å½ _owner ä½é¢240 balance_old = self.sht.functions.balanceOf(_owner).call()241 # è®°å½ fundAccount ä½é¢242 fund_account = self.sht.functions.fundAccount().call()243 fund_account_balance_old = self.sht.functions.balanceOf(fund_account).call()244 # 解é _owner è´¦æ·ï¼å¹¶éæ¾tokenå°fund245 self.w3.personal.unlockAccount(_owner, _pwd)246 tx_hash = self.sht.functions.addTokenToFund(_values).transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})247 # çå¾
确认248 ret = self.wait_tx_confirm(tx_hash)249 print("add_token_to_fund value", str(_values), "gas used:", ret["gasUsed"])250 # æ¥çè´¦æ·_fromçä½é¢,并éªè¯251 balance_new = self.sht.functions.balanceOf(_owner).call()252 self.assertEqual(balance_old - balance_new, _expect)253 # æ¥è¯¢ä½é¢ï¼å¹¶éªè¯254 fund_account_balance_new = self.sht.functions.balanceOf(fund_account).call()255 self.assertEqual(fund_account_balance_new - fund_account_balance_old , _expect)256 # æ¸
空账æ·ä½é¢257 # _who å°è¦æ¸
空çè´¦æ·258 # _pwd è´¦æ·_whoç解éå¯ç 259 def clear_all_sht(self, _who, _collect, _pwd):260 _who = Web3.toChecksumAddress(_who)261 # è·åè´¦æ·ä½é¢262 balance = self.sht.functions.balanceOf(_who).call()263 # å°ææä½é¢è½¬è´¦å°åæ¶è´¦æ·264 if balance > 0:265 self.transfer(_who, _collect, balance, _pwd, balance)266 # æ¸
空ææå¯ç¨ä½é¢267 # å 为å¯è½ææªå°æçä½é¢ï¼è¿é¨åä½é¢ä¸è½æä½268 def clear_all_ava_sht(self, _who, _collect, _pwd):269 _who = Web3.toChecksumAddress(_who)270 # è·åè´¦æ·ä½é¢271 balance = self.sht.functions.balanceOf(_who).call()272 # å°ææä½é¢è½¬è´¦å°åæ¶è´¦æ·273 if balance > 0:274 self.transfer(_who, _collect, balance, _pwd, balance)275 # å»ç»è´¦æ·276 # _admin 管çå277 # _who 被å»ç»è´¦æ·278 # _pwd 管çå解éå¯ç 279 def froze(self, _admin, _who, _pwd):280 _admin = Web3.toChecksumAddress(_admin)281 _who = Web3.toChecksumAddress(_who)282 # æ¥çç¶æ283 ret = self.sht.functions.frozenList(_who).call()284 if ret == True:285 return286 # 解é_adminè´¦æ·å¹¶å»ç»è´¦æ·287 self.w3.personal.unlockAccount(_admin, _pwd)288 tx_hash = self.sht.functions.froze(_who).transact({"from": _admin, "gas": config.gas, "gasPrice": config.gas_price})289 # çå¾
确认290 ret = self.wait_tx_confirm(tx_hash)291 print("froze gas used:", ret["gasUsed"])292 ret = self.sht.functions.frozenList(_who).call()293 self.assertEqual(ret, True)294 # 解é¤å»ç»è´¦æ·295 # _admin 管çå296 # _who 被解é¤å»ç»è´¦æ·297 # _pwd 管çå解éå¯ç 298 def unfroze(self, _admin, _who, _pwd):299 _admin = Web3.toChecksumAddress(_admin)300 _who = Web3.toChecksumAddress(_who)301 # æ¥çç¶æ302 ret = self.sht.functions.frozenList(_who).call()303 if ret == False:304 return305 # 解é_adminè´¦æ·å¹¶è§£å»è´¦æ·306 self.w3.personal.unlockAccount(_admin, _pwd)307 tx_hash = self.sht.functions.unfroze(_who).transact({"from": _admin, "gasPrice": config.gas_price})308 # çå¾
确认309 ret = self.wait_tx_confirm(tx_hash)310 print("unfroze gas used:", ret["gasUsed"])311 ret = self.sht.functions.frozenList(_who).call()312 self.assertEqual(ret, False)313 def set_sell_price(self, _admin, _price, _pwd):314 _admin = Web3.toChecksumAddress(_admin)315 # æ¥çsell price316 ret = self.sht.functions.sellPrice().call()317 if ret == _price:318 return319 # ç»ç®_adminè´¦æ·å¹¶è®¾ç½®ä»·æ ¼320 self.w3.personal.unlockAccount(_admin, _pwd)321 tx_hash = self.sht.functions.setSellPrice(_price).transact({"from": _admin, "gasPrice": config.gas_price})322 # çå¾
确认323 ret = self.wait_tx_confirm(tx_hash)324 print("set_sell_price gas used:", ret["gasUsed"])325 ret = self.sht.functions.sellPrice().call()326 self.assertEqual(ret, _price)327 def set_buy_price(self, _admin, _price, _pwd):328 _admin = Web3.toChecksumAddress(_admin)329 # æ¥çsell price330 ret = self.sht.functions.buyPrice().call()331 if ret == _price:332 return333 # ç»ç®_adminè´¦æ·å¹¶è®¾ç½®ä»·æ ¼334 self.w3.personal.unlockAccount(_admin, _pwd)335 tx_hash = self.sht.functions.setBuyPrice(_price).transact({"from": _admin, "gasPrice": config.gas_price})336 # çå¾
确认337 ret = self.wait_tx_confirm(tx_hash)338 print("set_buy_price gas used:", ret["gasUsed"])339 ret = self.sht.functions.buyPrice().call()340 self.assertEqual(ret, _price)341 def open_buy(self, _owner, _pwd):342 _owner = Web3.toChecksumAddress(_owner)343 # æ¥çç¶æ344 ret = self.sht.functions.buyFlag().call()345 if ret == True:346 return347 # 解éownerè´¦æ·ï¼å¹¶æå¼è´ä¹°åèµåå¼å
³348 self.w3.personal.unlockAccount(_owner, _pwd)349 tx_hash = self.sht.functions.openBuy().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})350 # çå¾
确认351 ret = self.wait_tx_confirm(tx_hash)352 print("open_buy_sell gas used:", ret["gasUsed"])353 ret = self.sht.functions.buyFlag().call()354 self.assertEqual(ret, True)355 def close_buy(self, _owner, _pwd):356 _owner = Web3.toChecksumAddress(_owner)357 # æ¥çç¶æ358 ret = self.sht.functions.buyFlag().call()359 if ret == False:360 return361 # 解éownerè´¦æ·ï¼å¹¶æå¼è´ä¹°åèµåå¼å
³362 self.w3.personal.unlockAccount(_owner, _pwd)363 tx_hash = self.sht.functions.closeBuy().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})364 # çå¾
确认365 ret = self.wait_tx_confirm(tx_hash)366 print("close_buy_sell gas used:", ret["gasUsed"])367 ret = self.sht.functions.buyFlag().call()368 self.assertEqual(ret, False)369 def open_sell(self, _owner, _pwd):370 _owner = Web3.toChecksumAddress(_owner)371 # æ¥çç¶æ372 ret = self.sht.functions.sellFlag().call()373 if ret == True:374 return375 # 解éownerè´¦æ·ï¼å¹¶æå¼è´ä¹°åèµåå¼å
³376 self.w3.personal.unlockAccount(_owner, _pwd)377 tx_hash = self.sht.functions.openSell().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})378 # çå¾
确认379 ret = self.wait_tx_confirm(tx_hash)380 print("open_buy_sell gas used:", ret["gasUsed"])381 ret = self.sht.functions.sellFlag().call()382 self.assertEqual(ret, True)383 def close_sell(self, _owner, _pwd):384 _owner = Web3.toChecksumAddress(_owner)385 # æ¥çç¶æ386 ret = self.sht.functions.sellFlag().call()387 if ret == False:388 return389 # 解éownerè´¦æ·ï¼å¹¶æå¼è´ä¹°åèµåå¼å
³390 self.w3.personal.unlockAccount(_owner, _pwd)391 tx_hash = self.sht.functions.closeSell().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})392 # çå¾
确认393 ret = self.wait_tx_confirm(tx_hash)394 print("close_buy_sell gas used:", ret["gasUsed"])395 ret = self.sht.functions.sellFlag().call()396 self.assertEqual(ret, False)397 # _value: ether value398 def buy(self, _who, _value, _pwd, _expect):399 _who = Web3.toChecksumAddress(_who)400 # è®°å½è´¦æ·_whoçä½é¢401 balance_old = self.sht.functions.balanceOf(_who).call()402 who_eth_balance_old = self.w3.eth.getBalance(_who)403 # è®°å½ fundAccount eth ä½é¢404 fund_account = self.sht.functions.fundAccount().call()405 fund_account_eth_balance_old = self.w3.eth.getBalance(fund_account)406 fund_account_balance_old = self.sht.functions.balanceOf(fund_account).call()407 # 解é_whoè´¦æ·ï¼å¹¶è´ä¹°_value 以太å¸çtoken408 self.w3.personal.unlockAccount(_who, _pwd)409 tx_hash = self.sht.functions.buy().transact({"from": _who, "value": _value, "gas": config.gas, "gasPrice": config.gas_price})410 # çå¾
确认411 ret = self.wait_tx_confirm(tx_hash)412 print("buy gas used:", ret["gasUsed"])413 # è®°å½è´¦æ·_toçä½é¢414 balance_new = self.sht.functions.balanceOf(_who).call()415 self.assertEqual(balance_new - balance_old, _expect)416 # _who åå°ç eth æ¯å¦æ£ç¡®417 who_eth_balance_new = self.w3.eth.getBalance(_who)418 self.assertEqual(who_eth_balance_old - who_eth_balance_new, ret["gasUsed"] * config.gas_price + Web3.toWei(1, "ether"))419 # fundAccount å¢å ç eth æ¯å¦æ£ç¡®420 fund_account_eth_balance_new = self.w3.eth.getBalance( fund_account)421 self.assertEqual(fund_account_eth_balance_new - fund_account_eth_balance_old, Web3.toWei(1, "ether"))422 # fundAccount åå°ç token423 fund_account_balance_new = self.sht.functions.balanceOf(fund_account).call()424 self.assertEqual(fund_account_balance_old - fund_account_balance_new, _expect)425 def sell(self, _who, _value, _pwd, _expect):426 _who = Web3.toChecksumAddress(_who)427 # è®°å½è´¦æ·_whoçä½é¢428 balance_old = self.sht.functions.balanceOf(_who).call()429 eth_balance_old = self.w3.eth.getBalance(_who)430 # è®°å½ fundAccount token ä½é¢431 fund_account = self.sht.functions.fundAccount().call()432 fund_account_balance_old = self.sht.functions.balanceOf(fund_account).call()433 # è®°å½ å约 eth ä½é¢434 eth_sht_account_balance_old = self.w3.eth.getBalance(Web3.toChecksumAddress(sts.SIBBAY_SHT_ADDRESS))435 # 解é_whoè´¦æ·ï¼å¹¶è´ä¹°_value 以太å¸çtoken436 self.w3.personal.unlockAccount(_who, _pwd)437 tx_hash = self.sht.functions.sell(_value).transact({"from": _who, "gas": config.gas, "gasPrice": config.gas_price})438 # çå¾
确认439 ret = self.wait_tx_confirm(tx_hash)440 print("sell gas used:", ret["gasUsed"])441 sell_price = self.sht.functions.sellPrice().call()442 # è®°å½è´¦æ·_toçä½é¢443 balance_new = self.sht.functions.balanceOf(_who).call()444 self.assertEqual(balance_old - balance_new, _expect)445 # who eth446 eth_balance_new = self.w3.eth.getBalance(_who)447 self.assertEqual(eth_balance_new - eth_balance_old + ret["gasUsed"] * config.gas_price, int(_expect * sell_price / config.magnitude))448 # è®°å½ fundAccount ä½é¢449 fund_account_balance_new = self.sht.functions.balanceOf(fund_account).call()450 # è®°å½ å约 eth ä½é¢451 eth_sht_account_balance_new = self.w3.eth.getBalance(Web3.toChecksumAddress(sts.SIBBAY_SHT_ADDRESS))452 self.assertEqual(fund_account_balance_new - fund_account_balance_old, _expect)453 self.assertEqual(eth_sht_account_balance_old - eth_sht_account_balance_new, int(_expect * sell_price / config.magnitude))454 def pause(self, _owner, _pwd):455 _owner = Web3.toChecksumAddress(_owner)456 # æ¥çå约ç¶æ457 ret = self.sht.functions.paused().call()458 if ret == True:459 return460 # 解é_ownerè´¦æ·ï¼æåå约461 self.w3.personal.unlockAccount(_owner, _pwd)462 tx_hash = self.sht.functions.pause().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})463 # çå¾
确认464 ret = self.wait_tx_confirm(tx_hash)465 print("pause gas used:", ret["gasUsed"])466 # æ¥çå约ç¶æ467 ret = self.sht.functions.paused().call()468 self.assertEqual(ret, True)469 def unpause(self, _owner, _pwd):470 _owner = Web3.toChecksumAddress(_owner)471 # æ¥çå约ç¶æ472 ret = self.sht.functions.paused().call()473 if ret == False:474 return475 # 解é_ownerè´¦æ·ï¼åæ¶æåå约476 self.w3.personal.unlockAccount(_owner, _pwd)477 tx_hash = self.sht.functions.unpause().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})478 # çå¾
确认479 ret = self.wait_tx_confirm(tx_hash)480 print("unpause gas used:", ret["gasUsed"])481 # æ¥çå约ç¶æ482 ret = self.sht.functions.paused().call()483 self.assertEqual(ret, False)484 def add_administrator(self, _owner, _who, _pwd):485 _owner = Web3.toChecksumAddress(_owner)486 _who = Web3.toChecksumAddress(_who)487 # æ¥ç管çåç¶æ488 ret = self.sht.functions.adminList(_who).call()489 if ret == True:490 return491 # 解é_ownerè´¦æ·ï¼æ·»å _who为管çå492 self.w3.personal.unlockAccount(_owner, _pwd)493 tx_hash = self.sht.functions.addAdministrator(_who).transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})494 # çå¾
确认495 ret = self.wait_tx_confirm(tx_hash)496 print("add_administrator gas used:", ret["gasUsed"])497 # æ¥ç管çåç¶æ498 ret = self.sht.functions.adminList(_who).call()499 self.assertEqual(ret, True)500 def del_administrator(self, _owner, _who, _pwd):501 _owner = Web3.toChecksumAddress(_owner)502 _who = Web3.toChecksumAddress(_who)503 # æ¥ç管çåç¶æ504 ret = self.sht.functions.adminList(_who).call()505 if ret == False:506 return507 # 解é_ownerè´¦æ·ï¼æ·»å _who为管çå508 self.w3.personal.unlockAccount(_owner, _pwd)509 tx_hash = self.sht.functions.delAdministrator(_who).transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})510 # çå¾
确认511 ret = self.wait_tx_confirm(tx_hash)512 print("del_administrator gas used:", ret["gasUsed"])513 # æ¥ç管çåç¶æ514 ret = self.sht.functions.adminList(_who).call()515 self.assertEqual(ret, False)516 def withdraw(self, _owner, _pwd):517 _owner = Web3.toChecksumAddress(_owner)518 # æ¥çå约ä½é¢519 ret = self.w3.eth.getBalance(Web3.toChecksumAddress(sts.SIBBAY_SHT_ADDRESS))520 if ret == 0:521 return522 # 解é_ownerè´¦æ·ï¼ååå约ä½é¢523 self.w3.personal.unlockAccount(_owner, _pwd)524 tx_hash = self.sht.functions.withdraw().transact({"from": _owner, "gas": config.gas, "gasPrice": config.gas_price})525 # çå¾
确认526 ret = self.wait_tx_confirm(tx_hash)527 print("withdraw gas used:", ret["gasUsed"])528 # æ¥çå约ä½é¢529 ret = self.w3.eth.getBalance(Web3.toChecksumAddress(sts.SIBBAY_SHT_ADDRESS))530 self.assertEqual(ret, 0)531 def balance_of(self, address, _expect):532 # æ¥çè´¦æ· address çä½é¢533 _who = Web3.toChecksumAddress(address)534 balance_value = self.sht.functions.balanceOf(_who).call()535 self.assertEqual(balance_value, _expect)536 def eth_balance_of(self, address, _expect):537 ret = self.w3.eth.getBalance(Web3.toChecksumAddress(address))538 self.assertEqual(ret, _expect)539 def eth_balance_of_sht_address(self, _expect):...
node_cli.py
Source:node_cli.py
...99 print ("%s" % cmd)100 self.pexpect = pexpect.spawn(cmd, timeout=4)101 # Add delay to ensure that the process is ready to receive commands.102 time.sleep(0.2)103 self._expect('spinel-cli >')104 self.debug(int(os.getenv('DEBUG', '0')))105 def _expect(self, pattern, timeout=-1, *args, **kwargs):106 """ Process simulator events until expected the pattern. """107 if timeout == -1:108 timeout = self.pexpect.timeout109 assert timeout > 0110 while timeout > 0:111 try:112 return self.pexpect.expect(pattern, 0.1, *args, **kwargs)113 except pexpect.TIMEOUT:114 timeout -= 0.1115 self.simulator.go(0)116 if timeout <= 0:117 raise118 def __init_soc(self, nodeid):119 """ Initialize a System-on-a-chip node connected via UART. """120 import fdpexpect121 serialPort = '/dev/ttyUSB%d' % ((nodeid-1)*2)122 self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY))123 def __del__(self):124 self.destroy()125 def destroy(self):126 if not self.pexpect:127 return128 if hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or \129 not hasattr(self.pexpect, 'proc') and self.pexpect.isalive():130 print("%d: exit" % self.nodeid)131 self.pexpect.send('exit\n')132 self.pexpect.expect(pexpect.EOF)133 self.pexpect = None134 def read_cert_messages_in_commissioning_log(self, timeout=-1):135 """Get the log of the traffic after DTLS handshake.136 """137 format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"138 join_fin_req = format_str % br"JOIN_FIN\.req"139 join_fin_rsp = format_str % br"JOIN_FIN\.rsp"140 dummy_format_str = br"\[THCI\].*?type=%s.*?"141 join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"142 join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"143 pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|("+ join_ent_ntf + b")|(" + join_ent_rsp + b")")144 messages = []145 # There are at most 4 cert messages both for joiner and commissioner146 for _ in range(0, 4):147 try:148 self._expect(pattern, timeout=timeout)149 log = self.pexpect.match.group(0)150 messages.append(self._extract_cert_message(log))151 except:152 break153 return messages154 def _extract_cert_message(self, log):155 res = re.search(br"direction=\w+", log)156 assert res157 direction = res.group(0).split(b'=')[1].strip()158 res = re.search(br"type=\S+", log)159 assert res160 type = res.group(0).split(b'=')[1].strip()161 payload = bytearray([])162 payload_len = 0163 if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:164 res = re.search(br"len=\d+", log)165 assert res166 payload_len = int(res.group(0).split(b'=')[1].strip())167 hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"168 while True:169 res = re.search(hex_pattern, log)170 if not res:171 break172 data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..']173 payload += bytearray(data)174 log = log[res.end()-1:]175 assert len(payload) == payload_len176 return (direction, type, payload)177 def send_command(self, cmd, go=True):178 print("%d: %s" % (self.nodeid, cmd))179 self.pexpect.send(cmd + '\n')180 if go:181 self.simulator.go(0, nodeid=self.nodeid)182 sys.stdout.flush()183 def get_commands(self):184 self.send_command('?')185 self._expect('Commands:')186 commands = []187 while True:188 i = self._expect(['Done', r'(\S+)'])189 if i != 0:190 commands.append(self.pexpect.match.groups()[0])191 else:192 break193 return commands194 def set_mode(self, mode):195 cmd = 'mode ' + mode196 self.send_command(cmd)197 self._expect('Done')198 def debug(self, level):199 # `debug` command will not trigger interaction with simulator200 self.send_command('debug '+ str(level), go=False)201 def interface_up(self):202 self.send_command('ifconfig up')203 self._expect('Done')204 def interface_down(self):205 self.send_command('ifconfig down')206 self._expect('Done')207 def thread_start(self):208 self.send_command('thread start')209 self._expect('Done')210 def thread_stop(self):211 self.send_command('thread stop')212 self._expect('Done')213 def commissioner_start(self):214 cmd = 'commissioner start'215 self.send_command(cmd)216 self._expect('Done')217 def commissioner_add_joiner(self, addr, psk):218 cmd = 'commissioner joiner add ' + addr + ' ' + psk219 self.send_command(cmd)220 self._expect('Done')221 def joiner_start(self, pskd='', provisioning_url=''):222 cmd = 'joiner start ' + pskd + ' ' + provisioning_url223 self.send_command(cmd)224 self._expect('Done')225 def clear_whitelist(self):226 cmd = 'macfilter addr clear'227 self.send_command(cmd)228 self._expect('Done')229 def enable_whitelist(self):230 cmd = 'macfilter addr whitelist'231 self.send_command(cmd)232 self._expect('Done')233 def disable_whitelist(self):234 cmd = 'macfilter addr disable'235 self.send_command(cmd)236 self._expect('Done')237 def add_whitelist(self, addr, rssi=None):238 cmd = 'macfilter addr add ' + addr239 if rssi != None:240 cmd += ' ' + str(rssi)241 self.send_command(cmd)242 self._expect('Done')243 def remove_whitelist(self, addr):244 cmd = 'macfilter addr remove ' + addr245 self.send_command(cmd)246 self._expect('Done')247 def get_addr16(self):248 self.send_command('rloc16')249 i = self._expect('([0-9a-fA-F]{4})')250 if i == 0:251 addr16 = int(self.pexpect.match.groups()[0], 16)252 self._expect('Done')253 return addr16254 def get_router_id(self):255 rloc16 = self.get_addr16()256 return (rloc16 >> 10)257 def get_addr64(self):258 self.send_command('extaddr')259 i = self._expect('([0-9a-fA-F]{16})')260 if i == 0:261 addr64 = self.pexpect.match.groups()[0].decode("utf-8")262 self._expect('Done')263 return addr64264 def get_eui64(self):265 self.send_command('eui64')266 i = self._expect('([0-9a-fA-F]{16})')267 if i == 0:268 addr64 = self.pexpect.match.groups()[0].decode("utf-8")269 self._expect('Done')270 return addr64271 def get_joiner_id(self):272 self.send_command('joinerid')273 i = self._expect('([0-9a-fA-F]{16})')274 if i == 0:275 addr = self.pexpect.match.groups()[0].decode("utf-8")276 self._expect('Done')277 return addr278 def get_channel(self):279 self.send_command('channel')280 i = self._expect(r'(\d+)\r?\n')281 if i == 0:282 channel = int(self.pexpect.match.groups()[0])283 self._expect('Done')284 return channel285 def set_channel(self, channel):286 cmd = 'channel %d' % channel287 self.send_command(cmd)288 self._expect('Done')289 def get_masterkey(self):290 self.send_command('masterkey')291 i = self._expect('([0-9a-fA-F]{32})')292 if i == 0:293 masterkey = self.pexpect.match.groups()[0].decode("utf-8")294 self._expect('Done')295 return masterkey296 def set_masterkey(self, masterkey):297 cmd = 'masterkey ' + masterkey298 self.send_command(cmd)299 self._expect('Done')300 def get_key_sequence_counter(self):301 self.send_command('keysequence counter')302 i = self._expect(r'(\d+)\r?\n')303 if i == 0:304 key_sequence_counter = int(self.pexpect.match.groups()[0])305 self._expect('Done')306 return key_sequence_counter307 def set_key_sequence_counter(self, key_sequence_counter):308 cmd = 'keysequence counter %d' % key_sequence_counter309 self.send_command(cmd)310 self._expect('Done')311 def set_key_switch_guardtime(self, key_switch_guardtime):312 cmd = 'keysequence guardtime %d' % key_switch_guardtime313 self.send_command(cmd)314 self._expect('Done')315 def set_network_id_timeout(self, network_id_timeout):316 cmd = 'networkidtimeout %d' % network_id_timeout317 self.send_command(cmd)318 self._expect('Done')319 def get_network_name(self):320 self.send_command('networkname')321 while True:322 i = self._expect(['Done', r'(\S+)'])323 if i != 0:324 network_name = self.pexpect.match.groups()[0].decode('utf-8')325 else:326 break327 return network_name328 def set_network_name(self, network_name):329 cmd = 'networkname ' + network_name330 self.send_command(cmd)331 self._expect('Done')332 def get_panid(self):333 self.send_command('panid')334 i = self._expect('([0-9a-fA-F]{4})')335 if i == 0:336 panid = int(self.pexpect.match.groups()[0], 16)337 self._expect('Done')338 return panid339 def set_panid(self, panid):340 cmd = 'panid %d' % panid341 self.send_command(cmd)342 self._expect('Done')343 def get_partition_id(self):344 self.send_command('leaderpartitionid')345 i = self._expect(r'(\d+)\r?\n')346 if i == 0:347 weight = self.pexpect.match.groups()[0]348 self._expect('Done')349 return weight350 def set_partition_id(self, partition_id):351 cmd = 'leaderpartitionid %d' % partition_id352 self.send_command(cmd)353 self._expect('Done')354 def set_router_upgrade_threshold(self, threshold):355 cmd = 'routerupgradethreshold %d' % threshold356 self.send_command(cmd)357 self._expect('Done')358 def set_router_downgrade_threshold(self, threshold):359 cmd = 'routerdowngradethreshold %d' % threshold360 self.send_command(cmd)361 self._expect('Done')362 def release_router_id(self, router_id):363 cmd = 'releaserouterid %d' % router_id364 self.send_command(cmd)365 self._expect('Done')366 def get_state(self):367 states = ['detached', 'child', 'router', 'leader']368 self.send_command('state')369 match = self._expect(states)370 self._expect('Done')371 return states[match]372 def set_state(self, state):373 cmd = 'state ' + state374 self.send_command(cmd)375 self._expect('Done')376 def get_timeout(self):377 self.send_command('childtimeout')378 i = self._expect(r'(\d+)\r?\n')379 if i == 0:380 timeout = self.pexpect.match.groups()[0]381 self._expect('Done')382 return timeout383 def set_timeout(self, timeout):384 cmd = 'childtimeout %d' % timeout385 self.send_command(cmd)386 self._expect('Done')387 def set_max_children(self, number):388 cmd = 'childmax %d' % number389 self.send_command(cmd)390 self._expect('Done')391 def get_weight(self):392 self.send_command('leaderweight')393 i = self._expect(r'(\d+)\r?\n')394 if i == 0:395 weight = self.pexpect.match.groups()[0]396 self._expect('Done')397 return weight398 def set_weight(self, weight):399 cmd = 'leaderweight %d' % weight400 self.send_command(cmd)401 self._expect('Done')402 def add_ipaddr(self, ipaddr):403 cmd = 'ipaddr add ' + ipaddr404 self.send_command(cmd)405 self._expect('Done')406 def get_addrs(self):407 addrs = []408 self.send_command('ipaddr')409 while True:410 i = self._expect([r'(\S+(:\S*)+)\r?\n', 'Done'])411 if i == 0:412 addrs.append(self.pexpect.match.groups()[0].decode("utf-8"))413 elif i == 1:414 break415 return addrs416 def get_addr(self, prefix):417 network = ipaddress.ip_network(u'%s' % str(prefix))418 addrs = self.get_addrs()419 for addr in addrs:420 if isinstance(addr, bytearray):421 addr = bytes(addr)422 elif isinstance(addr, str) and sys.version_info[0] == 2:423 addr = addr.decode("utf-8")424 ipv6_address = ipaddress.ip_address(addr)425 if ipv6_address in network:426 return ipv6_address.exploded427 return None428 def get_eidcaches(self):429 eidcaches = []430 self.send_command('eidcache')431 while True:432 i = self._expect([r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)\r?\n', 'Done'])433 if i == 0:434 eid = self.pexpect.match.groups()[0].decode("utf-8")435 rloc = self.pexpect.match.groups()[1].decode("utf-8")436 eidcaches.append((eid, rloc))437 elif i == 1:438 break439 return eidcaches440 def add_service(self, enterpriseNumber, serviceData, serverData):441 cmd = 'service add ' + enterpriseNumber + ' ' + serviceData+ ' ' + serverData442 self.send_command(cmd)443 self._expect('Done')444 def remove_service(self, enterpriseNumber, serviceData):445 cmd = 'service remove ' + enterpriseNumber + ' ' + serviceData446 self.send_command(cmd)447 self._expect('Done')448 def __getLinkLocalAddress(self):449 for ip6Addr in self.get_addrs():450 if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):451 return ip6Addr452 return None453 def __getGlobalAddress(self):454 global_address = []455 for ip6Addr in self.get_addrs():456 if (not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and \457 (not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and \458 (not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):459 global_address.append(ip6Addr)460 return global_address461 def __getRloc(self):462 for ip6Addr in self.get_addrs():463 if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and \464 re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and \465 not(re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)):466 return ip6Addr467 return None468 def __getAloc(self):469 aloc = []470 for ip6Addr in self.get_addrs():471 if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and \472 re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and \473 re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I):474 aloc.append(ip6Addr)475 return aloc476 def __getMleid(self):477 for ip6Addr in self.get_addrs():478 if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and \479 not(re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):480 return ip6Addr481 return None482 def get_ip6_address(self, address_type):483 """Get specific type of IPv6 address configured on thread device.484 Args:485 address_type: the config.ADDRESS_TYPE type of IPv6 address.486 Returns:487 IPv6 address string.488 """489 if address_type == config.ADDRESS_TYPE.LINK_LOCAL:490 return self.__getLinkLocalAddress()491 elif address_type == config.ADDRESS_TYPE.GLOBAL:492 return self.__getGlobalAddress()493 elif address_type == config.ADDRESS_TYPE.RLOC:494 return self.__getRloc()495 elif address_type == config.ADDRESS_TYPE.ALOC:496 return self.__getAloc()497 elif address_type == config.ADDRESS_TYPE.ML_EID:498 return self.__getMleid()499 else:500 return None501 return None502 def get_context_reuse_delay(self):503 self.send_command('contextreusedelay')504 i = self._expect(r'(\d+)\r?\n')505 if i == 0:506 timeout = self.pexpect.match.groups()[0]507 self._expect('Done')508 return timeout509 def set_context_reuse_delay(self, delay):510 cmd = 'contextreusedelay %d' % delay511 self.send_command(cmd)512 self._expect('Done')513 def add_prefix(self, prefix, flags, prf = 'med'):514 cmd = 'prefix add ' + prefix + ' ' + flags + ' ' + prf515 self.send_command(cmd)516 self._expect('Done')517 def remove_prefix(self, prefix):518 cmd = 'prefix remove ' + prefix519 self.send_command(cmd)520 self._expect('Done')521 def add_route(self, prefix, prf = 'med'):522 cmd = 'route add ' + prefix + ' ' + prf523 self.send_command(cmd)524 self._expect('Done')525 def remove_route(self, prefix):526 cmd = 'route remove ' + prefix527 self.send_command(cmd)528 self._expect('Done')529 def register_netdata(self):530 self.send_command('netdataregister')531 self._expect('Done')532 def energy_scan(self, mask, count, period, scan_duration, ipaddr):533 cmd = 'commissioner energy ' + str(mask) + ' ' + str(count) + ' ' + str(period) + ' ' + str(scan_duration) + ' ' + ipaddr534 self.send_command(cmd)535 if isinstance(self.simulator, simulator.VirtualTime):536 self.simulator.go(8)537 timeout = 1538 else:539 timeout = 8540 self._expect('Energy:', timeout=timeout)541 def panid_query(self, panid, mask, ipaddr):542 cmd = 'commissioner panid ' + str(panid) + ' ' + str(mask) + ' ' + ipaddr543 self.send_command(cmd)544 if isinstance(self.simulator, simulator.VirtualTime):545 self.simulator.go(8)546 timeout = 1547 else:548 timeout = 8549 self._expect('Conflict:', timeout=timeout)550 def scan(self):551 self.send_command('scan')552 results = []553 while True:554 i = self._expect([r'\|\s(\S+)\s+\|\s(\S+)\s+\|\s([0-9a-fA-F]{4})\s\|\s([0-9a-fA-F]{16})\s\|\s(\d+)\r?\n',555 'Done'])556 if i == 0:557 results.append(self.pexpect.match.groups())558 else:559 break560 return results561 def ping(self, ipaddr, num_responses=1, size=None, timeout=5):562 cmd = 'ping ' + ipaddr563 if size != None:564 cmd += ' ' + str(size)565 self.send_command(cmd)566 if isinstance(self.simulator, simulator.VirtualTime):567 self.simulator.go(timeout)568 result = True569 try:570 responders = {}571 while len(responders) < num_responses:572 i = self._expect([r'from (\S+):'])573 if i == 0:574 responders[self.pexpect.match.groups()[0]] = 1575 self._expect('\n')576 except (pexpect.TIMEOUT, socket.timeout):577 result = False578 if isinstance(self.simulator, simulator.VirtualTime):579 self.simulator.sync_devices()580 return result581 def reset(self):582 self.send_command('reset')583 time.sleep(0.1)584 def set_router_selection_jitter(self, jitter):585 cmd = 'routerselectionjitter %d' % jitter586 self.send_command(cmd)587 self._expect('Done')588 def set_active_dataset(self, timestamp, panid=None, channel=None, channel_mask=None, master_key=None):589 self.send_command('dataset clear')590 self._expect('Done')591 cmd = 'dataset activetimestamp %d' % timestamp592 self.send_command(cmd)593 self._expect('Done')594 if panid != None:595 cmd = 'dataset panid %d' % panid596 self.send_command(cmd)597 self._expect('Done')598 if channel != None:599 cmd = 'dataset channel %d' % channel600 self.send_command(cmd)601 self._expect('Done')602 if channel_mask != None:603 cmd = 'dataset channelmask %d' % channel_mask604 self.send_command(cmd)605 self._expect('Done')606 if master_key != None:607 cmd = 'dataset masterkey ' + master_key608 self.send_command(cmd)609 self._expect('Done')610 self.send_command('dataset commit active')611 self._expect('Done')612 def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None):613 self.send_command('dataset clear')614 self._expect('Done')615 cmd = 'dataset pendingtimestamp %d' % pendingtimestamp616 self.send_command(cmd)617 self._expect('Done')618 cmd = 'dataset activetimestamp %d' % activetimestamp619 self.send_command(cmd)620 self._expect('Done')621 if panid != None:622 cmd = 'dataset panid %d' % panid623 self.send_command(cmd)624 self._expect('Done')625 if channel != None:626 cmd = 'dataset channel %d' % channel627 self.send_command(cmd)628 self._expect('Done')629 self.send_command('dataset commit pending')630 self._expect('Done')631 def announce_begin(self, mask, count, period, ipaddr):632 cmd = 'commissioner announce ' + str(mask) + ' ' + str(count) + ' ' + str(period) + ' ' + ipaddr633 self.send_command(cmd)634 self._expect('Done')635 def send_mgmt_active_set(self, active_timestamp=None, channel=None, channel_mask=None, extended_panid=None,636 panid=None, master_key=None, mesh_local=None, network_name=None, binary=None):637 cmd = 'dataset mgmtsetcommand active '638 if active_timestamp != None:639 cmd += 'activetimestamp %d ' % active_timestamp640 if channel != None:641 cmd += 'channel %d ' % channel642 if channel_mask != None:643 cmd += 'channelmask %d ' % channel_mask644 if extended_panid != None:645 cmd += 'extpanid ' + extended_panid + ' '646 if panid != None:647 cmd += 'panid %d ' % panid648 if master_key != None:649 cmd += 'masterkey ' + master_key + ' '650 if mesh_local != None:651 cmd += 'localprefix ' + mesh_local + ' '652 if network_name != None:653 cmd += 'networkname ' + network_name + ' '654 if binary != None:655 cmd += 'binary ' + binary + ' '656 self.send_command(cmd)657 self._expect('Done')658 def send_mgmt_pending_set(self, pending_timestamp=None, active_timestamp=None, delay_timer=None, channel=None,659 panid=None, master_key=None, mesh_local=None, network_name=None):660 cmd = 'dataset mgmtsetcommand pending '661 if pending_timestamp != None:662 cmd += 'pendingtimestamp %d ' % pending_timestamp663 if active_timestamp != None:664 cmd += 'activetimestamp %d ' % active_timestamp665 if delay_timer != None:666 cmd += 'delaytimer %d ' % delay_timer667 if channel != None:668 cmd += 'channel %d ' % channel669 if panid != None:670 cmd += 'panid %d ' % panid671 if master_key != None:672 cmd += 'masterkey ' + master_key + ' '673 if mesh_local != None:674 cmd += 'localprefix ' + mesh_local + ' '675 if network_name != None:676 cmd += 'networkname ' + network_name + ' '677 self.send_command(cmd)678 self._expect('Done')679 def coaps_start_psk(self, psk, pskIdentity):680 cmd = 'coaps set psk ' + psk + ' ' + pskIdentity681 self.send_command(cmd)682 self._expect('Done')683 cmd = 'coaps start'684 self.send_command(cmd)685 self._expect('Done')686 def coaps_start_x509(self):687 cmd = 'coaps set x509'688 self.send_command(cmd)689 self._expect('Done')690 cmd = 'coaps start'691 self.send_command(cmd)692 self._expect('Done')693 def coaps_set_resource_path(self, path):694 cmd = 'coaps resource ' + path695 self.send_command(cmd)696 self._expect('Done')697 def coaps_stop(self):698 cmd = 'coaps stop'699 self.send_command(cmd)700 if isinstance(self.simulator, simulator.VirtualTime):701 self.simulator.go(5)702 timeout = 1703 else:704 timeout = 5705 self._expect('Done', timeout=timeout)706 def coaps_connect(self, ipaddr):707 cmd = 'coaps connect ' + ipaddr708 self.send_command(cmd)709 if isinstance(self.simulator, simulator.VirtualTime):710 self.simulator.go(5)711 timeout = 1712 else:713 timeout = 5714 self._expect('CoAP Secure connected!', timeout=timeout)715 def coaps_disconnect(self):716 cmd = 'coaps disconnect'717 self.send_command(cmd)718 self._expect('Done')719 self.simulator.go(5)720 def coaps_get(self):721 cmd = 'coaps get test'722 self.send_command(cmd)723 if isinstance(self.simulator, simulator.VirtualTime):724 self.simulator.go(5)725 timeout = 1726 else:727 timeout = 5...
test_videosize.py
Source:test_videosize.py
...38 dar[0], dar[1])39 else:40 result += " "41 return result42 def _expect(self, iw, ih, ipar, ow, oh, opar, mw, mh, pref, ew, eh, epar):43 rw, rh, rpar = videosize.getVideoSize(iw, ih, ipar,44 ow, oh, opar,45 mw, mh, pref)46 #print "IN: %s - PREF: %s - OUT: %s" % (self._format(iw, ih, ipar),47 # self._format(ow, oh, opar),48 # self._format(rw, rh, rpar))49 idar = (float(iw) * ipar[0]) / (float(ih) * ipar[1])50 rdar = (float(rw) * rpar[0]) / (float(rh) * rpar[1])51 self.assertTrue(abs(idar - rdar) < 0.03, "Display Aspect Ratio Changed")52 self.assertEquals((rw, rh, rpar), (ew, eh, epar))53 def testMaxSizeWithPreferredSize(self):54 self._expect(320, 240, (1, 1),55 200, None, (1, 1),56 300, 200, None,57 200, 150, (1, 1))58 self._expect(320, 240, (1, 1),59 200, None, (1, 1),60 150, 100, None,61 133, 100, (1, 1))62 self._expect(320, 240, (1, 1),63 200, None, (1, 1),64 100, 50, None,65 67, 50, (1, 1))66 self._expect(320, 240, (1, 1),67 None, 180, (1, 1),68 300, 200, None,69 240, 180, (1, 1))70 self._expect(320, 240, (1, 1),71 None, 180, (1, 1),72 150, 100, None,73 133, 100, (1, 1))74 self._expect(320, 240, (1, 1),75 None, 180, (1, 1),76 100, 50, None,77 67, 50, (1, 1))78 self._expect(320, 240, (1, 1),79 180, 180, (1, 1),80 300, 200, None,81 180, 135, (1, 1))82 self._expect(320, 240, (1, 1),83 180, 180, (1, 1),84 150, 100, None,85 133, 100, (1, 1))86 self._expect(320, 240, (1, 1),87 180, 180, (1, 1),88 100, 50, None,89 67, 50, (1, 1))90 def testMaxSizeWithoutPreferredSize(self):91 self._expect(320, 240, (1, 1),92 None, None, (1, 1),93 200, 300, VideoScaleMethodEnum.upscale,94 200, 150, (1, 1))95 self._expect(320, 240, (1, 1),96 None, None, (1, 2),97 200, 300, VideoScaleMethodEnum.upscale,98 200, 75, (1, 2))99 self._expect(320, 240, (1, 1),100 None, None, (1, 3),101 200, 300, VideoScaleMethodEnum.upscale,102 200, 50, (1, 3))103 self._expect(320, 240, (1, 1),104 None, None, (2, 1),105 200, 300, VideoScaleMethodEnum.upscale,106 200, 300, (2, 1))107 self._expect(320, 240, (1, 1),108 None, None, (3, 1),109 200, 300, VideoScaleMethodEnum.upscale,110 133, 300, (3, 1))111 self._expect(320, 240, (1, 1),112 None, None, (1, 1),113 400, 500, VideoScaleMethodEnum.upscale,114 320, 240, (1, 1))115 self._expect(320, 240, (1, 1),116 None, None, (1, 2),117 400, 500, VideoScaleMethodEnum.upscale,118 400, 150, (1, 2))119 self._expect(320, 240, (1, 1),120 None, None, (1, 3),121 400, 500, VideoScaleMethodEnum.upscale,122 400, 100, (1, 3))123 self._expect(320, 240, (1, 1),124 None, None, (2, 1),125 400, 500, VideoScaleMethodEnum.upscale,126 320, 480, (2, 1))127 self._expect(320, 240, (1, 1),128 None, None, (3, 1),129 400, 500, VideoScaleMethodEnum.upscale,130 222, 500, (3, 1))131 def testPreferredSize(self):132 self._expect(320, 240, (1, 1),133 200, 200, (1, 1),134 None, None, None,135 200, 150, (1, 1))136 self._expect(240, 320, (1, 1),137 200, 200, (1, 1),138 None, None, None,139 150, 200, (1, 1))140 self._expect(320, 240, (3, 4),141 200, 300, (1, 2),142 None, None, None,143 200, 100, (1, 2))144 self._expect(320, 240, (3, 4),145 200, 300, (1, 3),146 None, None, None,147 200, 67, (1, 3))148 self._expect(320, 240, (3, 4),149 200, 300, (2, 1),150 None, None, None,151 150, 300, (2, 1))152 self._expect(320, 240, (3, 4),153 200, 300, (3, 1),154 None, None, None,155 100, 300, (3, 1))156 self._expect(240, 320, (3, 4),157 200, 300, (1, 2),158 None, None, None,159 200, 178, (1, 2))160 self._expect(240, 320, (3, 4),161 200, 300, (1, 3),162 None, None, None,163 200, 119, (1, 3))164 self._expect(240, 320, (3, 4),165 200, 300, (2, 1),166 None, None, None,167 84, 300, (2, 1))168 self._expect(240, 320, (3, 4),169 200, 300, (3, 1),170 None, None, None,171 56, 300, (3, 1))172 self._expect(900, 900, (7, 8),173 800, 200, (1, 1),174 None, None, None,175 175, 200, (1, 1))176 self._expect(900, 900, (7, 8),177 200, 800, (1, 1),178 None, None, None,179 200, 229, (1, 1))180 def testPreferredHeight(self):181 self._expect(320, 240, (1, 1),182 None, 200, (1, 1),183 None, None, None,184 267, 200, (1, 1))185 self._expect(240, 320, (1, 1),186 None, 200, (1, 1),187 None, None, None,188 150, 200, (1, 1))189 self._expect(320, 240, (8, 7),190 None, 200, None,191 None, None, None,192 267, 200, (8, 7))193 self._expect(240, 320, (8, 7),194 None, 200, None,195 None, None, None,196 150, 200, (8, 7))197 self._expect(320, 240, (3, 4),198 None, 200, (1, 1),199 None, None, None,200 200, 200, (1, 1))201 self._expect(320, 240, (3, 4),202 None, 200, (1, 2),203 None, None, None,204 400, 200, (1, 2))205 self._expect(320, 240, (3, 4),206 None, 200, (1, 3),207 None, None, None,208 600, 200, (1, 3))209 self._expect(320, 240, (3, 4),210 None, 200, (2, 1),211 None, None, None,212 100, 200, (2, 1))213 self._expect(320, 240, (3, 4),214 None, 200, (3, 1),215 None, None, None,216 67, 200, (3, 1))217 self._expect(240, 320, (3, 4),218 None, 200, (1, 1),219 None, None, None,220 113, 200, (1, 1))221 self._expect(240, 320, (3, 4),222 None, 200, (1, 2),223 None, None, None,224 225, 200, (1, 2))225 self._expect(240, 320, (3, 4),226 None, 200, (1, 3),227 None, None, None,228 338, 200, (1, 3))229 self._expect(240, 320, (3, 4),230 None, 200, (2, 1),231 None, None, None,232 56, 200, (2, 1))233 self._expect(240, 320, (3, 4),234 None, 200, (3, 1),235 None, None, None,236 38, 200, (3, 1))237 def testPreferredWidth(self):238 self._expect(320, 240, (1, 1),239 200, None, (1, 1),240 None, None, None,241 200, 150, (1, 1))242 self._expect(240, 320, (1, 1),243 200, None, (1, 1),244 None, None, None,245 200, 267, (1, 1))246 self._expect(320, 240, (8, 7),247 200, None, None,248 None, None, None,249 200, 150, (8, 7))250 self._expect(240, 320, (8, 7),251 200, None, None,252 None, None, None,253 200, 267, (8, 7))254 self._expect(320, 240, (4, 3),255 200, None, (1, 1),256 None, None, None,257 200, 113, (1, 1))258 self._expect(320, 240, (4, 3),259 200, None, (1, 2),260 None, None, None,261 200, 56, (1, 2))262 self._expect(320, 240, (4, 3),263 200, None, (1, 3),264 None, None, None,265 200, 38, (1, 3))266 self._expect(320, 240, (4, 3),267 200, None, (2, 1),268 None, None, None,269 200, 225, (2, 1))270 self._expect(320, 240, (4, 3),271 200, None, (3, 1),272 None, None, None,273 200, 338, (3, 1))274 self._expect(240, 320, (4, 3),275 200, None, (1, 1),276 None, None, None,277 200, 200, (1, 1))278 self._expect(240, 320, (4, 3),279 200, None, (1, 2),280 None, None, None,281 200, 100, (1, 2))282 self._expect(240, 320, (4, 3),283 200, None, (1, 3),284 None, None, None,285 200, 67, (1, 3))286 self._expect(240, 320, (4, 3),287 200, None, (2, 1),288 None, None, None,289 200, 400, (2, 1))290 self._expect(240, 320, (4, 3),291 200, None, (3, 1),292 None, None, None,293 200, 600, (3, 1))294 def testDefaultPreferredMethod(self):295 #The default preferred method is "height"296 self._expect(320, 240, (1, 1),297 None, None, (1, 2),298 None, None, None,299 640, 240, (1, 2))300 self._expect(240, 320, (1, 1),301 None, None, (1, 2),302 None, None, None,303 480, 320, (1, 2))304 self._expect(320, 240, (1, 1),305 None, None, (2, 1),306 None, None, None,307 160, 240, (2, 1))308 self._expect(240, 320, (1, 1),309 None, None, (2, 1),310 None, None, None,311 120, 320, (2, 1))312 def testPreserveHeightMethod(self):313 self._expect(320, 240, (1, 1),314 None, None, (1, 2),315 None, None, VideoScaleMethodEnum.height,316 640, 240, (1, 2))317 self._expect(320, 240, (1, 1),318 None, None, (1, 3),319 None, None, VideoScaleMethodEnum.height,320 960, 240, (1, 3))321 self._expect(320, 240, (1, 1),322 None, None, (2, 1),323 None, None, VideoScaleMethodEnum.height,324 160, 240, (2, 1))325 self._expect(320, 240, (1, 1),326 None, None, (3, 1),327 None, None, VideoScaleMethodEnum.height,328 107, 240, (3, 1))329 self._expect(240, 320, (1, 1),330 None, None, (1, 2),331 None, None, VideoScaleMethodEnum.height,332 480, 320, (1, 2))333 self._expect(240, 320, (1, 1),334 None, None, (1, 3),335 None, None, VideoScaleMethodEnum.height,336 720, 320, (1, 3))337 self._expect(240, 320, (1, 1),338 None, None, (2, 1),339 None, None, VideoScaleMethodEnum.height,340 120, 320, (2, 1))341 self._expect(240, 320, (1, 1),342 None, None, (3, 1),343 None, None, VideoScaleMethodEnum.height,344 80, 320, (3, 1))345 self._expect(320, 240, (5, 4),346 None, None, (1, 2),347 None, None, VideoScaleMethodEnum.height,348 800, 240, (1, 2))349 self._expect(320, 240, (5, 4),350 None, None, (1, 3),351 None, None, VideoScaleMethodEnum.height,352 1200, 240, (1, 3))353 self._expect(320, 240, (5, 4),354 None, None, (2, 1),355 None, None, VideoScaleMethodEnum.height,356 200, 240, (2, 1))357 self._expect(320, 240, (5, 4),358 None, None, (3, 1),359 None, None, VideoScaleMethodEnum.height,360 133, 240, (3, 1))361 self._expect(240, 320, (5, 4),362 None, None, (1, 2),363 None, None, VideoScaleMethodEnum.height,364 600, 320, (1, 2))365 self._expect(240, 320, (5, 4),366 None, None, (1, 3),367 None, None, VideoScaleMethodEnum.height,368 900, 320, (1, 3))369 self._expect(240, 320, (5, 4),370 None, None, (2, 1),371 None, None, VideoScaleMethodEnum.height,372 150, 320, (2, 1))373 self._expect(240, 320, (5, 4),374 None, None, (3, 1),375 None, None, VideoScaleMethodEnum.height,376 100, 320, (3, 1))377 def testPreserveWidthMethod(self):378 self._expect(320, 240, (1, 1),379 None, None, (1, 2),380 None, None, VideoScaleMethodEnum.width,381 320, 120, (1, 2))382 self._expect(320, 240, (1, 1),383 None, None, (1, 3),384 None, None, VideoScaleMethodEnum.width,385 320, 80, (1, 3))386 self._expect(320, 240, (1, 1),387 None, None, (2, 1),388 None, None, VideoScaleMethodEnum.width,389 320, 480, (2, 1))390 self._expect(320, 240, (1, 1),391 None, None, (3, 1),392 None, None, VideoScaleMethodEnum.width,393 320, 720, (3, 1))394 self._expect(240, 320, (1, 1),395 None, None, (1, 2),396 None, None, VideoScaleMethodEnum.width,397 240, 160, (1, 2))398 self._expect(240, 320, (1, 1),399 None, None, (1, 3),400 None, None, VideoScaleMethodEnum.width,401 240, 107, (1, 3))402 self._expect(240, 320, (1, 1),403 None, None, (2, 1),404 None, None, VideoScaleMethodEnum.width,405 240, 640, (2, 1))406 self._expect(240, 320, (1, 1),407 None, None, (3, 1),408 None, None, VideoScaleMethodEnum.width,409 240, 960, (3, 1))410 self._expect(320, 240, (7, 8),411 None, None, (1, 2),412 None, None, VideoScaleMethodEnum.width,413 320, 137, (1, 2))414 self._expect(320, 240, (7, 8),415 None, None, (1, 3),416 None, None, VideoScaleMethodEnum.width,417 320, 91, (1, 3))418 self._expect(320, 240, (7, 8),419 None, None, (2, 1),420 None, None, VideoScaleMethodEnum.width,421 320, 549, (2, 1))422 self._expect(320, 240, (7, 8),423 None, None, (3, 1),424 None, None, VideoScaleMethodEnum.width,425 320, 823, (3, 1))426 self._expect(240, 320, (7, 8),427 None, None, (1, 2),428 None, None, VideoScaleMethodEnum.width,429 240, 183, (1, 2))430 self._expect(240, 320, (7, 8),431 None, None, (1, 3),432 None, None, VideoScaleMethodEnum.width,433 240, 122, (1, 3))434 self._expect(240, 320, (7, 8),435 None, None, (2, 1),436 None, None, VideoScaleMethodEnum.width,437 240, 731, (2, 1))438 self._expect(240, 320, (7, 8),439 None, None, (3, 1),440 None, None, VideoScaleMethodEnum.width,441 240, 1097, (3, 1))442 def testUpscaleMethod(self):443 self._expect(320, 240, (1, 1),444 None, None, (1, 2),445 None, None, VideoScaleMethodEnum.upscale,446 640, 240, (1, 2))447 self._expect(320, 240, (1, 1),448 None, None, (1, 3),449 None, None, VideoScaleMethodEnum.upscale,450 960, 240, (1, 3))451 self._expect(320, 240, (1, 1),452 None, None, (2, 1),453 None, None, VideoScaleMethodEnum.upscale,454 320, 480, (2, 1))455 self._expect(320, 240, (1, 1),456 None, None, (3, 1),457 None, None, VideoScaleMethodEnum.upscale,458 320, 720, (3, 1))459 self._expect(240, 320, (1, 1),460 None, None, (1, 2),461 None, None, VideoScaleMethodEnum.upscale,462 480, 320, (1, 2))463 self._expect(240, 320, (1, 1),464 None, None, (1, 3),465 None, None, VideoScaleMethodEnum.upscale,466 720, 320, (1, 3))467 self._expect(240, 320, (1, 1),468 None, None, (2, 1),469 None, None, VideoScaleMethodEnum.upscale,470 240, 640, (2, 1))471 self._expect(240, 320, (1, 1),472 None, None, (3, 1),473 None, None, VideoScaleMethodEnum.upscale,474 240, 960, (3, 1))475 self._expect(320, 240, (2, 3),476 None, None, (1, 2),477 None, None, VideoScaleMethodEnum.upscale,478 427, 240, (1, 2))479 self._expect(320, 240, (2, 3),480 None, None, (1, 3),481 None, None, VideoScaleMethodEnum.upscale,482 640, 240, (1, 3))483 self._expect(320, 240, (2, 3),484 None, None, (2, 1),485 None, None, VideoScaleMethodEnum.upscale,486 320, 720, (2, 1))487 self._expect(320, 240, (2, 3),488 None, None, (3, 1),489 None, None, VideoScaleMethodEnum.upscale,490 320, 1080, (3, 1))491 self._expect(240, 320, (2, 3),492 None, None, (1, 2),493 None, None, VideoScaleMethodEnum.upscale,494 320, 320, (1, 2))495 self._expect(240, 320, (2, 3),496 None, None, (1, 3),497 None, None, VideoScaleMethodEnum.upscale,498 480, 320, (1, 3))499 self._expect(240, 320, (2, 3),500 None, None, (2, 1),501 None, None, VideoScaleMethodEnum.upscale,502 240, 960, (2, 1))503 self._expect(240, 320, (2, 3),504 None, None, (3, 1),505 None, None, VideoScaleMethodEnum.upscale,506 240, 1440, (3, 1))507 def testDownscaleMethod(self):508 self._expect(320, 240, (1, 1),509 None, None, (1, 2),510 None, None, VideoScaleMethodEnum.downscale,511 320, 120, (1, 2))512 self._expect(320, 240, (1, 1),513 None, None, (1, 3),514 None, None, VideoScaleMethodEnum.downscale,515 320, 80, (1, 3))516 self._expect(320, 240, (1, 1),517 None, None, (2, 1),518 None, None, VideoScaleMethodEnum.downscale,519 160, 240, (2, 1))520 self._expect(320, 240, (1, 1),521 None, None, (3, 1),522 None, None, VideoScaleMethodEnum.downscale,523 107, 240, (3, 1))524 self._expect(240, 320, (1, 1),525 None, None, (1, 2),526 None, None, VideoScaleMethodEnum.downscale,527 240, 160, (1, 2))528 self._expect(240, 320, (1, 1),529 None, None, (1, 3),530 None, None, VideoScaleMethodEnum.downscale,531 240, 107, (1, 3))532 self._expect(240, 320, (1, 1),533 None, None, (2, 1),534 None, None, VideoScaleMethodEnum.downscale,535 120, 320, (2, 1))536 self._expect(240, 320, (1, 1),537 None, None, (3, 1),538 None, None, VideoScaleMethodEnum.downscale,539 80, 320, (3, 1))540 self._expect(320, 240, (1, 4),541 None, None, (1, 2),542 None, None, VideoScaleMethodEnum.downscale,543 160, 240, (1, 2))544 self._expect(320, 240, (1, 4),545 None, None, (1, 3),546 None, None, VideoScaleMethodEnum.downscale,547 240, 240, (1, 3))548 self._expect(320, 240, (1, 4),549 None, None, (2, 1),550 None, None, VideoScaleMethodEnum.downscale,551 40, 240, (2, 1))552 self._expect(320, 240, (1, 4),553 None, None, (3, 1),554 None, None, VideoScaleMethodEnum.downscale,555 27, 240, (3, 1))556 self._expect(240, 320, (1, 4),557 None, None, (1, 2),558 None, None, VideoScaleMethodEnum.downscale,559 120, 320, (1, 2))560 self._expect(240, 320, (1, 4),561 None, None, (1, 3),562 None, None, VideoScaleMethodEnum.downscale,563 180, 320, (1, 3))564 self._expect(240, 320, (1, 4),565 None, None, (2, 1),566 None, None, VideoScaleMethodEnum.downscale,567 30, 320, (2, 1))568 self._expect(240, 320, (1, 4),569 None, None, (3, 1),570 None, None, VideoScaleMethodEnum.downscale,571 20, 320, (3, 1))572 def testWithoutPreferredSizeAndPAR(self):573 self._expect(320, 240, (1, 1),574 None, None, None,575 None, None, None,576 320, 240, (1, 1))577 self._expect(320, 240, (2, 1),578 None, None, None,579 None, None, None,580 320, 240, (2, 1))581 self._expect(320, 240, (3, 1),582 None, None, None,583 None, None, None,584 320, 240, (3, 1))585 self._expect(320, 240, (1, 2),586 None, None, None,587 None, None, None,588 320, 240, (1, 2))589 self._expect(320, 240, (1, 3),590 None, None, None,591 None, None, None,592 320, 240, (1, 3))593 self._expect(320, 100, (1, 1),594 None, None, None,595 None, None, None,596 320, 100, (1, 1))597 self._expect(100, 240, (1, 1),598 None, None, None,599 None, None, None,...
test_queryparser.py
Source:test_queryparser.py
...27 return (Splitter(),)28 def _makeLexicon(self):29 from zope.index.text.lexicon import Lexicon30 return Lexicon(*self._makePipeline())31 def _expect(self, parser, input, output, expected_ignored=[]):32 tree = parser.parseQuery(input)33 ignored = parser.getIgnored()34 self._compareParseTrees(tree, output)35 self.assertEqual(ignored, expected_ignored)36 # Check that parseQueryEx() == (parseQuery(), getIgnored())37 ex_tree, ex_ignored = parser.parseQueryEx(input)38 self._compareParseTrees(ex_tree, tree)39 self.assertEqual(ex_ignored, expected_ignored)40 def _failure(self, parser, input):41 from zope.index.text.parsetree import ParseError42 self.assertRaises(ParseError, parser.parseQuery, input)43 self.assertRaises(ParseError, parser.parseQueryEx, input)44 def _compareParseTrees(self, got, expected, msg=None):45 from zope.index.text.parsetree import AndNode46 from zope.index.text.parsetree import AtomNode47 from zope.index.text.parsetree import GlobNode48 from zope.index.text.parsetree import NotNode49 from zope.index.text.parsetree import OrNode50 from zope.index.text.parsetree import ParseTreeNode51 from zope.index.text.parsetree import PhraseNode52 if msg is None:53 msg = repr(got)54 self.assertEqual(isinstance(got, ParseTreeNode), 1)55 self.assertEqual(got.__class__, expected.__class__, msg)56 if isinstance(got, PhraseNode):57 self.assertEqual(got.nodeType(), "PHRASE", msg)58 self.assertEqual(got.getValue(), expected.getValue(), msg)59 elif isinstance(got, GlobNode):60 self.assertEqual(got.nodeType(), "GLOB", msg)61 self.assertEqual(got.getValue(), expected.getValue(), msg)62 elif isinstance(got, AtomNode):63 self.assertEqual(got.nodeType(), "ATOM", msg)64 self.assertEqual(got.getValue(), expected.getValue(), msg)65 elif isinstance(got, NotNode):66 self.assertEqual(got.nodeType(), "NOT")67 self._compareParseTrees(got.getValue(), expected.getValue(), msg)68 elif isinstance(got, AndNode) or isinstance(got, OrNode):69 self.assertEqual(got.nodeType(),70 isinstance(got, AndNode) and "AND" or "OR", msg)71 list1 = got.getValue()72 list2 = expected.getValue()73 self.assertEqual(len(list1), len(list2), msg)74 for i in range(len(list1)):75 self._compareParseTrees(list1[i], list2[i], msg)76class TestQueryParser(TestQueryParserBase):77 def test_class_conforms_to_IQueryParser(self):78 from zope.interface.verify import verifyClass79 from zope.index.text.interfaces import IQueryParser80 verifyClass(IQueryParser, self._getTargetClass())81 def test_instance_conforms_to_IQueryParser(self):82 from zope.interface.verify import verifyObject83 from zope.index.text.interfaces import IQueryParser84 verifyObject(IQueryParser, self._makeOne())85 def test001(self):86 from zope.index.text.parsetree import AtomNode87 parser = self._makeOne()88 self._expect(parser, "foo", AtomNode("foo"))89 def test002(self):90 from zope.index.text.parsetree import AtomNode91 parser = self._makeOne()92 self._expect(parser, "note", AtomNode("note"))93 def test003(self):94 from zope.index.text.parsetree import AndNode95 from zope.index.text.parsetree import AtomNode96 parser = self._makeOne()97 self._expect(parser, "aa and bb AND cc",98 AndNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))99 def test004(self):100 from zope.index.text.parsetree import OrNode101 from zope.index.text.parsetree import AtomNode102 parser = self._makeOne()103 self._expect(parser, "aa OR bb or cc",104 OrNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))105 def test005(self):106 from zope.index.text.parsetree import AndNode107 from zope.index.text.parsetree import AtomNode108 from zope.index.text.parsetree import OrNode109 parser = self._makeOne()110 self._expect(parser, "aa AND bb OR cc AnD dd",111 OrNode([AndNode([AtomNode("aa"), AtomNode("bb")]),112 AndNode([AtomNode("cc"), AtomNode("dd")])]))113 def test006(self):114 from zope.index.text.parsetree import AndNode115 from zope.index.text.parsetree import AtomNode116 from zope.index.text.parsetree import OrNode117 parser = self._makeOne()118 self._expect(parser, "(aa OR bb) AND (cc OR dd)",119 AndNode([OrNode([AtomNode("aa"), AtomNode("bb")]),120 OrNode([AtomNode("cc"), AtomNode("dd")])]))121 def test007(self):122 from zope.index.text.parsetree import AndNode123 from zope.index.text.parsetree import AtomNode124 from zope.index.text.parsetree import NotNode125 parser = self._makeOne()126 self._expect(parser, "aa AND NOT bb",127 AndNode([AtomNode("aa"), NotNode(AtomNode("bb"))]))128 def test008(self):129 from zope.index.text.parsetree import AndNode130 from zope.index.text.parsetree import AtomNode131 from zope.index.text.parsetree import NotNode132 parser = self._makeOne()133 self._expect(parser, "aa NOT bb",134 AndNode([AtomNode("aa"), NotNode(AtomNode("bb"))]))135 def test010(self):136 from zope.index.text.parsetree import PhraseNode137 parser = self._makeOne()138 self._expect(parser, '"foo bar"', PhraseNode(["foo", "bar"]))139 def test011(self):140 from zope.index.text.parsetree import AndNode141 from zope.index.text.parsetree import AtomNode142 parser = self._makeOne()143 self._expect(parser, "foo bar",144 AndNode([AtomNode("foo"), AtomNode("bar")]))145 def test012(self):146 from zope.index.text.parsetree import PhraseNode147 parser = self._makeOne()148 self._expect(parser, '(("foo bar"))"', PhraseNode(["foo", "bar"]))149 def test013(self):150 from zope.index.text.parsetree import AndNode151 from zope.index.text.parsetree import AtomNode152 parser = self._makeOne()153 self._expect(parser, "((foo bar))",154 AndNode([AtomNode("foo"), AtomNode("bar")]))155 def test014(self):156 from zope.index.text.parsetree import PhraseNode157 parser = self._makeOne()158 self._expect(parser, "foo-bar", PhraseNode(["foo", "bar"]))159 def test015(self):160 from zope.index.text.parsetree import AndNode161 from zope.index.text.parsetree import AtomNode162 from zope.index.text.parsetree import NotNode163 parser = self._makeOne()164 self._expect(parser, "foo -bar", AndNode([AtomNode("foo"),165 NotNode(AtomNode("bar"))]))166 def test016(self):167 from zope.index.text.parsetree import AndNode168 from zope.index.text.parsetree import AtomNode169 from zope.index.text.parsetree import NotNode170 parser = self._makeOne()171 self._expect(parser, "-foo bar", AndNode([AtomNode("bar"),172 NotNode(AtomNode("foo"))]))173 def test017(self):174 from zope.index.text.parsetree import AndNode175 from zope.index.text.parsetree import AtomNode176 from zope.index.text.parsetree import NotNode177 from zope.index.text.parsetree import PhraseNode178 parser = self._makeOne()179 self._expect(parser, "booh -foo-bar",180 AndNode([AtomNode("booh"),181 NotNode(PhraseNode(["foo", "bar"]))]))182 def test018(self):183 from zope.index.text.parsetree import AndNode184 from zope.index.text.parsetree import AtomNode185 from zope.index.text.parsetree import NotNode186 from zope.index.text.parsetree import PhraseNode187 parser = self._makeOne()188 self._expect(parser, 'booh -"foo bar"',189 AndNode([AtomNode("booh"),190 NotNode(PhraseNode(["foo", "bar"]))]))191 def test019(self):192 from zope.index.text.parsetree import AndNode193 from zope.index.text.parsetree import AtomNode194 parser = self._makeOne()195 self._expect(parser, 'foo"bar"',196 AndNode([AtomNode("foo"), AtomNode("bar")]))197 def test020(self):198 from zope.index.text.parsetree import AndNode199 from zope.index.text.parsetree import AtomNode200 parser = self._makeOne()201 self._expect(parser, '"foo"bar',202 AndNode([AtomNode("foo"), AtomNode("bar")]))203 def test021(self):204 from zope.index.text.parsetree import AndNode205 from zope.index.text.parsetree import AtomNode206 parser = self._makeOne()207 self._expect(parser, 'foo"bar"blech',208 AndNode([AtomNode("foo"), AtomNode("bar"),209 AtomNode("blech")]))210 def test022(self):211 from zope.index.text.parsetree import GlobNode212 parser = self._makeOne()213 self._expect(parser, "foo*", GlobNode("foo*"))214 def test023(self):215 from zope.index.text.parsetree import AndNode216 from zope.index.text.parsetree import AtomNode217 from zope.index.text.parsetree import GlobNode218 parser = self._makeOne()219 self._expect(parser, "foo* bar",220 AndNode([GlobNode("foo*"), AtomNode("bar")]))221 def test101(self):222 parser = self._makeOne()223 self._failure(parser, "")224 def test102(self):225 parser = self._makeOne()226 self._failure(parser, "not")227 def test103(self):228 parser = self._makeOne()229 self._failure(parser, "or")230 def test104(self):231 parser = self._makeOne()232 self._failure(parser, "and")233 def test105(self):234 parser = self._makeOne()235 self._failure(parser, "NOT")236 def test106(self):237 parser = self._makeOne()238 self._failure(parser, "OR")239 def test107(self):240 parser = self._makeOne()241 self._failure(parser, "AND")242 def test108(self):243 parser = self._makeOne()244 self._failure(parser, "NOT foo")245 def test109(self):246 parser = self._makeOne()247 self._failure(parser, ")")248 def test110(self):249 parser = self._makeOne()250 self._failure(parser, "(")251 def test111(self):252 parser = self._makeOne()253 self._failure(parser, "foo OR")254 def test112(self):255 parser = self._makeOne()256 self._failure(parser, "foo AND")257 def test113(self):258 parser = self._makeOne()259 self._failure(parser, "OR foo")260 def test114(self):261 parser = self._makeOne()262 self._failure(parser, "AND foo")263 def test115(self):264 parser = self._makeOne()265 self._failure(parser, "(foo) bar")266 def test116(self):267 parser = self._makeOne()268 self._failure(parser, "(foo OR)")269 def test117(self):270 parser = self._makeOne()271 self._failure(parser, "(foo AND)")272 def test118(self):273 parser = self._makeOne()274 self._failure(parser, "(NOT foo)")275 def test119(self):276 parser = self._makeOne()277 self._failure(parser, "-foo")278 def test120(self):279 parser = self._makeOne()280 self._failure(parser, "-foo -bar")281 def test121(self):282 parser = self._makeOne()283 self._failure(parser, "foo OR -bar")284 def test122(self):285 parser = self._makeOne()286 self._failure(parser, "foo AND -bar")287class StopWordTestQueryParser(TestQueryParserBase):288 def _makePipeline(self):289 from zope.index.text.lexicon import Splitter290 return (Splitter(), FakeStopWordRemover())291 def test201(self):292 from zope.index.text.parsetree import AtomNode293 parser = self._makeOne()294 self._expect(parser, 'and/', AtomNode("and"))295 def test202(self):296 from zope.index.text.parsetree import AtomNode297 parser = self._makeOne()298 self._expect(parser, 'foo AND stop', AtomNode("foo"), ["stop"])299 def test203(self):300 from zope.index.text.parsetree import AtomNode301 parser = self._makeOne()302 self._expect(parser, 'foo AND NOT stop', AtomNode("foo"), ["stop"])303 def test204(self):304 from zope.index.text.parsetree import AtomNode305 parser = self._makeOne()306 self._expect(parser, 'stop AND foo', AtomNode("foo"), ["stop"])307 def test205(self):308 from zope.index.text.parsetree import AtomNode309 parser = self._makeOne()310 self._expect(parser, 'foo OR stop', AtomNode("foo"), ["stop"])311 def test206(self):312 from zope.index.text.parsetree import AtomNode313 parser = self._makeOne()314 self._expect(parser, 'stop OR foo', AtomNode("foo"), ["stop"])315 def test207(self):316 from zope.index.text.parsetree import AndNode317 from zope.index.text.parsetree import AtomNode318 parser = self._makeOne()319 self._expect(parser, 'foo AND bar NOT stop',320 AndNode([AtomNode("foo"), AtomNode("bar")]), ["stop"])321 def test301(self):322 parser = self._makeOne()323 self._failure(parser, 'stop')324 def test302(self):325 parser = self._makeOne()326 self._failure(parser, 'stop stop')327 def test303(self):328 parser = self._makeOne()329 self._failure(parser, 'stop AND stop')330 def test304(self):331 parser = self._makeOne()332 self._failure(parser, 'stop OR stop')333 def test305(self):...
test_type_comparable.py
Source:test_type_comparable.py
1from collections import UserDict, UserList2from sys import maxsize3from typing import Any, Optional4import pytest5from type_comparable import make_type_comparable6int_comparable_types = [int, object, Any, Optional]7str_comparable_types = [str, object, Any, Optional]8bool_comparable_types = [bool, object, Any, Optional]9list_comparable_types = [list, object, Any, Optional]10dict_comparable_types = [dict, object, Any, Optional]11def assert_left_and_rigth(v1, v2, op):12 if op == '==':13 assert v1 == v214 assert v2 == v115 elif op == '!=':16 assert v1 != v217 assert v2 != v118 else:19 assert False, 'invalid op ``'.format(op)20class TestSimple:21 22 def test_none(self):23 assert_left_and_rigth(make_type_comparable(None), None, '==')24 assert make_type_comparable(None) == make_type_comparable(None)25 def test_none_not_equal_any(self):26 assert not (make_type_comparable(None) == Any)27 assert not (Any == make_type_comparable(None))28 def test_none_not_equal_object(self):29 assert not (make_type_comparable(None) == object)30 assert not (object == make_type_comparable(None))31 def test_none_equal_optional(self):32 assert_left_and_rigth(make_type_comparable(None), Optional, '==')33 # TODO:34 # def test_optional_type(self):35 # assert make_type_comparable(1) == Optional[int]36 @pytest.mark.parametrize('value', [0, 1, -1, maxsize, -maxsize])37 @pytest.mark.parametrize('compare_with', int_comparable_types)38 def test_int(self, value, compare_with):39 assert value != compare_with40 obj = make_type_comparable(value)41 assert isinstance(obj, int)42 assert_left_and_rigth(obj, value, '==')43 assert_left_and_rigth(obj, compare_with, '==')44 assert_left_and_rigth(obj, value + 1, '!=')45 assert_left_and_rigth(obj, None, '!=')46 assert obj == make_type_comparable(value)47 assert_left_and_rigth(48 value, make_type_comparable(compare_with), '=='49 )50 @pytest.mark.parametrize('value', [True, False])51 @pytest.mark.parametrize('compare_with', bool_comparable_types)52 def test_bool(self, value, compare_with):53 assert value != compare_with54 obj = make_type_comparable(value)55 assert isinstance(obj, bool)56 assert_left_and_rigth(obj, value, '==')57 assert_left_and_rigth(obj, compare_with, '==')58 assert_left_and_rigth(obj, not value, '!=')59 assert_left_and_rigth(obj, None, '!=')60 assert obj == make_type_comparable(value)61 assert_left_and_rigth(62 value, make_type_comparable(compare_with), '=='63 )64 @pytest.mark.parametrize('value', ['', 'a', 'aaaaaaaaaaaaaaaaaaaaaaaaaaa'])65 @pytest.mark.parametrize('compare_with', str_comparable_types)66 def test_str(self, value, compare_with):67 assert value != compare_with68 obj = make_type_comparable(value)69 assert isinstance(obj, str)70 assert_left_and_rigth(obj, value, '==')71 assert_left_and_rigth(obj, compare_with, '==')72 assert_left_and_rigth(obj, '{}-other'.format(value), '!=')73 assert_left_and_rigth(obj, None, '!=')74 assert obj == make_type_comparable(value)75 assert_left_and_rigth(76 value, make_type_comparable(compare_with), '=='77 )78 @pytest.mark.parametrize('value', [[], [1, ], [1, 2, 3]])79 @pytest.mark.parametrize('compare_with', list_comparable_types)80 def test_list(self, value, compare_with):81 assert value != compare_with82 obj = make_type_comparable(value)83 assert isinstance(obj, UserList)84 assert_left_and_rigth(obj, value, '==')85 assert_left_and_rigth(obj, compare_with, '==')86 assert_left_and_rigth(obj, value + [0], '!=')87 assert_left_and_rigth(obj, None, '!=')88 assert obj == make_type_comparable(value)89 assert_left_and_rigth(90 value, make_type_comparable(compare_with), '=='91 )92 @pytest.mark.parametrize('value', [{}, {'a': 'A'}])93 @pytest.mark.parametrize('compare_with', dict_comparable_types)94 def test_dict(self, value, compare_with):95 assert value != compare_with96 obj = make_type_comparable(value)97 assert isinstance(obj, UserDict)98 assert_left_and_rigth(obj, value, '==')99 assert_left_and_rigth(obj, compare_with, '==')100 _expect = value.copy()101 _expect.update({'other_key': 'other_value'})102 assert_left_and_rigth(obj, _expect, '!=')103 assert obj != None104 assert_left_and_rigth(obj, None, '!=')105 assert obj == make_type_comparable(value)106 assert_left_and_rigth(107 value, make_type_comparable(compare_with), '=='108 )109class TestDictValues:110 @staticmethod111 def make_simple_dict():112 return {113 'int': 1,114 'str': 'value str',115 'bool': True,116 'none': None,117 'dict': {},118 'list': [],119 'additional key': 'additional value'120 }121 @classmethod122 def make_complex_dict(cls):123 d = cls.make_simple_dict()124 d['dict'] = cls.make_simple_dict()125 return d126 @pytest.mark.parametrize('field_int', int_comparable_types)127 @pytest.mark.parametrize('field_str', str_comparable_types)128 @pytest.mark.parametrize('field_bool', bool_comparable_types)129 @pytest.mark.parametrize('field_dict', dict_comparable_types)130 @pytest.mark.parametrize('field_list', list_comparable_types)131 def test_values(132 self, field_int, field_str, field_bool, field_dict, field_list133 ):134 dct = self.make_simple_dict()135 expect = self.make_simple_dict()136 expect['int'] = field_int137 expect['str'] = field_str138 expect['bool'] = field_bool139 expect['dict'] = field_dict140 expect['list'] = field_list141 assert dct != expect142 obj = make_type_comparable(dct)143 assert isinstance(obj, UserDict)144 assert_left_and_rigth(obj, expect, '==')145 assert_left_and_rigth(obj['int'], field_int, '==')146 assert_left_and_rigth(obj['str'], field_str, '==')147 assert_left_and_rigth(obj['bool'], field_bool, '==')148 assert_left_and_rigth(obj['dict'], field_dict, '==')149 assert_left_and_rigth(obj['list'], field_list, '==')150 _expect = expect.copy()151 _expect.update({'additional key': 'changed'})152 assert_left_and_rigth(obj, _expect, '!=')153 _expect = expect.copy()154 _expect.update({'new key': 'new value'})155 assert_left_and_rigth(obj, _expect, '!=')156 @pytest.mark.parametrize('field_int', int_comparable_types)157 @pytest.mark.parametrize('field_str', str_comparable_types)158 @pytest.mark.parametrize('field_bool', bool_comparable_types)159 @pytest.mark.parametrize('field_dict', dict_comparable_types)160 @pytest.mark.parametrize('field_list', list_comparable_types)161 def test_sub_dict_values(162 self, field_int, field_str, field_bool, field_dict, field_list163 ):164 dct = self.make_complex_dict()165 expect = self.make_complex_dict()166 expect['dict']['int'] = field_int167 expect['dict']['str'] = field_str168 expect['dict']['bool'] = field_bool169 expect['dict']['dict'] = field_dict170 expect['dict']['list'] = field_list171 assert dct != expect172 obj = make_type_comparable(dct)173 assert isinstance(obj, UserDict)174 assert_left_and_rigth(obj, expect, '==')175 assert_left_and_rigth(obj['dict'], expect['dict'], '==')176 assert_left_and_rigth(obj['dict']['int'], field_int, '==')177 assert_left_and_rigth(obj['dict']['str'], field_str, '==')178 assert_left_and_rigth(obj['dict']['bool'], field_bool, '==')179 assert_left_and_rigth(obj['dict']['dict'], field_dict, '==')180 assert_left_and_rigth(obj['dict']['list'], field_list, '==')181 _expect = expect.copy()182 _expect.update({'additional key': 'changed'})183 assert_left_and_rigth(obj['dict'], _expect, '!=')184 _expect = expect.copy()185 _expect.update({'new key': 'new value'})186 assert_left_and_rigth(obj['dict'], _expect, '!=')187 _expect = expect.copy()188 _expect_dict = expect['dict'].copy()189 _expect_dict.update({'additional key': 'changed'})190 _expect.update({'dict': _expect_dict})191 assert_left_and_rigth(obj, _expect, '!=')192 assert_left_and_rigth(obj['dict'], _expect['dict'], '!=')193 _expect = expect.copy()194 _expect_dict = expect['dict']195 _expect_dict.update({'new key': 'new value'})196 _expect.update({'dict': _expect_dict})197 assert_left_and_rigth(obj['dict'], _expect['dict'], '!=')198class TestListValues:199 @staticmethod200 def make_simple_list():201 return [1, 'value str', True, None, {}, [], 'additional value']202 @classmethod203 def make_complex_list(cls):204 l = cls.make_simple_list()205 l.append(cls.make_simple_list())206 return l207 @pytest.mark.parametrize('field_int', int_comparable_types)208 @pytest.mark.parametrize('field_str', str_comparable_types)209 @pytest.mark.parametrize('field_bool', bool_comparable_types)210 @pytest.mark.parametrize('field_dict', dict_comparable_types)211 @pytest.mark.parametrize('field_list', list_comparable_types)212 def test_values(213 self, field_int, field_str, field_bool, field_dict, field_list214 ):215 dct = self.make_simple_list()216 expect = self.make_simple_list()217 expect[0] = field_int218 expect[1] = field_str219 expect[2] = field_bool220 expect[4] = field_dict221 expect[5] = field_list222 assert dct != expect223 obj = make_type_comparable(dct)224 assert isinstance(obj, UserList)225 assert_left_and_rigth(obj, expect, '==')226 assert_left_and_rigth(obj[0], field_int, '==')227 assert_left_and_rigth(obj[1], field_str, '==')228 assert_left_and_rigth(obj[2], field_bool, '==')229 assert_left_and_rigth(obj[4], field_dict, '==')230 assert_left_and_rigth(obj[5], field_list, '==')231 _expect = expect.copy()232 _expect[6] = 'changed'233 assert_left_and_rigth(obj,_expect, '!=')234 assert_left_and_rigth(obj, expect + ['new value'], '!=')235 @pytest.mark.parametrize('field_int', int_comparable_types)236 @pytest.mark.parametrize('field_str', str_comparable_types)237 @pytest.mark.parametrize('field_bool', bool_comparable_types)238 @pytest.mark.parametrize('field_dict', dict_comparable_types)239 @pytest.mark.parametrize('field_list', list_comparable_types)240 def test_inner_list_values(241 self, field_int, field_str, field_bool, field_dict, field_list242 ):243 dct = self.make_complex_list()244 expect = self.make_complex_list()245 expect[7][0] = field_int246 expect[7][1] = field_str247 expect[7][2] = field_bool248 expect[7][4] = field_dict249 expect[7][5] = field_list250 assert dct != expect251 obj = make_type_comparable(dct)252 assert isinstance(obj, UserList)253 assert obj == expect254 assert_left_and_rigth(obj[7][0], field_int, '==')255 assert_left_and_rigth(obj[7][1], field_str, '==')256 assert_left_and_rigth(obj[7][2], field_bool, '==')257 assert_left_and_rigth(obj[7][4], field_dict, '==')258 assert_left_and_rigth(obj[7][5], field_list, '==')259 _expect = expect.copy()260 _expect[7][6] = 'changed'261 assert_left_and_rigth(obj, _expect, '!=')...
parser.py
Source:parser.py
...6class Parser:7 def __init__(self, lexer: Lexer):8 self.lexer = lexer9 self.current = lexer.next_token()10 def _expect(self, expected: List[str]) -> None:11 if self.current.domain not in expected:12 row, column = self.current.coords13 raise ParseError(f"({row}, {column}): expected '{', '.join(expected)}', got '{self.current.value}'")14 def _next(self) -> None:15 self.current = self.lexer.next_token()16 # Command ::= '(' Insert | Query ')'17 def parse_command(self) -> AST:18 self._expect([token.LEFT_PAREN])19 self._next()20 ast = self._parse_insert() if self.current.domain == token.NEW_KEYWORD else self._parse_query()21 self._expect([token.RIGHT_PAREN])22 self._next()23 self._expect([token.EOF_DOMAIN])24 return ast25 # Insert ::= '@new' Entity+26 def _parse_insert(self) -> AST:27 self._expect([token.NEW_KEYWORD])28 ast: AST = [token_to_atom(self.current)]29 self._next()30 self._expect([token.LEFT_PAREN])31 while self.current.domain == token.LEFT_PAREN:32 ast.append(self._parse_entity())33 return ast34 # Entity ::= '(' Assertion | Rule ')'35 def _parse_entity(self) -> AST:36 self._expect([token.LEFT_PAREN])37 self._next()38 ast = self._parse_rule() if self.current.domain == token.RULE_KEYWORD else self._parse_assertion()39 self._expect([token.RIGHT_PAREN])40 self._next()41 return ast42 # Assertion ::= ('(' Assertion ')' | Word | Number)*43 def _parse_assertion(self) -> AST:44 expected_domains = [token.LEFT_PAREN, token.WORD_DOMAIN, token.NUMBER_DOMAIN]45 ast: AST = []46 while self.current.domain in expected_domains:47 if self.current.domain == token.LEFT_PAREN:48 self._next()49 ast.append(self._parse_assertion())50 self._expect([token.RIGHT_PAREN])51 else:52 ast.append(token_to_atom(self.current))53 self._next()54 return ast55 # Rule ::= '@rule' '(' SimpleQuery ')' ('(' Query ')')?56 def _parse_rule(self) -> AST:57 self._expect([token.RULE_KEYWORD])58 ast: AST = [token_to_atom(self.current)]59 self._next()60 self._expect([token.LEFT_PAREN])61 self._next()62 ast.append(self._parse_simple_query())63 self._expect([token.RIGHT_PAREN])64 self._next()65 if self.current.domain == token.LEFT_PAREN:66 self._next()67 ast.append(self._parse_query())68 self._expect([token.RIGHT_PAREN])69 self._next()70 return ast71 # Query ::= SimpleQuery | AndQuery | OrQuery | NotQuery72 def _parse_query(self) -> AST:73 if self.current.domain == token.AND_KEYWORD:74 return self._parse_and_query()75 if self.current.domain == token.OR_KEYWORD:76 return self._parse_or_query()77 if self.current.domain == token.NOT_KEYWORD:78 return self._not_query()79 return self._parse_simple_query()80 # AndQuery ::= '@and' InnerQueries81 def _parse_and_query(self) -> AST:82 self._expect([token.AND_KEYWORD])83 ast: AST = [token_to_atom(self.current)]84 self._next()85 return [*ast, *self._parse_inner_queries()]86 # OrQuery ::= '@or' InnerQueries87 def _parse_or_query(self) -> AST:88 self._expect([token.OR_KEYWORD])89 ast = [token_to_atom(self.current)]90 self._next()91 return [*ast, *self._parse_inner_queries()]92 # NotQuery ::= '@not' InnerQuery93 def _not_query(self) -> AST:94 self._expect([token.NOT_KEYWORD])95 ast: AST = [token_to_atom(self.current)]96 self._next()97 ast.append(self._parse_inner_query())98 return ast99 # InnerQueries ::= InnerQuery+100 def _parse_inner_queries(self) -> AST:101 self._expect([token.LEFT_PAREN])102 ast: AST = []103 while self.current.domain == token.LEFT_PAREN:104 ast.append(self._parse_inner_query())105 return ast106 # InnerQuery ::= '(' Query | Apply ')'107 def _parse_inner_query(self) -> AST:108 self._expect([token.LEFT_PAREN])109 self._next()110 ast = self._parse_apply() if self.current.domain == token.APPLY_KEYWORD else self._parse_query()111 self._expect([token.RIGHT_PAREN])112 self._next()113 return ast114 # Apply ::= '@apply' Predicate ApplyArguments115 # Predicate ::= '<' | '>' | Word116 # ApplyArguments ::= (Var | Word | Number)+117 def _parse_apply(self) -> AST:118 self._expect([token.APPLY_KEYWORD])119 ast: AST = [token_to_atom(self.current)]120 self._next()121 self._expect([token.LESS_OP, token.GREATER_OP, token.WORD_DOMAIN])122 ast.append(token_to_atom(self.current))123 self._next()124 expected_domains = [token.VAR_DOMAIN, token.WORD_DOMAIN, token.NUMBER_DOMAIN]125 self._expect(expected_domains)126 while self.current.domain in expected_domains:127 ast.append(token_to_atom(self.current))128 self._next()129 return ast130 # SimpleQuery ::= ('(' SimpleQuery ')' | Var | Word | Number)* ('.' Var)?131 def _parse_simple_query(self) -> AST:132 expected_domains = [token.LEFT_PAREN, token.VAR_DOMAIN, token.WORD_DOMAIN, token.NUMBER_DOMAIN]133 ast: AST = []134 while self.current.domain in expected_domains:135 if self.current.domain == token.LEFT_PAREN:136 self._next()137 ast.append(self._parse_simple_query())138 self._expect([token.RIGHT_PAREN])139 else:140 ast.append(token_to_atom(self.current))141 self._next()142 if self.current.domain == token.DOT:143 ast.append(token_to_atom(self.current))144 self._next()145 self._expect([token.VAR_DOMAIN])146 ast.append(token_to_atom(self.current))147 self._next()...
common_catch_and_log.py
Source:common_catch_and_log.py
...24 self._expected = []25 def test_simple(self):26 complaint = 'No cheese, Gromit!'27 self._catch_and_log.critical(complaint)28 self._expect(complaint, catch_and_log._CRITICAL)29 self._assert_logs_match()30 def test_multiple(self):31 complaints = [32 'Failed to find cheese in pantry',33 'Failed to install cheese to mousetrap',34 'Failed to arm trap',35 'Failed to catch mouse'36 ]37 for complaint in complaints:38 self._catch_and_log.critical(complaint)39 self._expect(complaint, catch_and_log._CRITICAL)40 self._assert_logs_match()41 def test_multiple_levels(self):42 complaint = 'No cheese, Gromit!'43 self._catch_and_log.critical(complaint)44 self._expect(complaint, catch_and_log._CRITICAL)45 complaint = 'Moon out of range!'46 self._catch_and_log.warn(complaint)47 self._expect(complaint, catch_and_log._WARNING)48 complaint = 'Parking brake engaged!'49 self._catch_and_log.warning(complaint)50 self._expect(complaint, catch_and_log._WARNING)51 complaint = 'Five mice spectating'52 self._catch_and_log.info(complaint)53 self._expect(complaint, catch_and_log._INFO)54 complaint = 'Red light blinking'55 self._catch_and_log.info(complaint)56 self._expect(complaint, catch_and_log._INFO)57 complaint = 'Low blinker fluid'58 self._catch_and_log.warning(complaint)59 self._expect(complaint, catch_and_log._WARNING)60 complaint = 'Insufficient fuel for landing!'61 self._catch_and_log.critical(complaint)62 self._expect(complaint, catch_and_log._CRITICAL)63 self._assert_logs_match()64 def test_exception_suppressed(self):65 topic = 'Entering Orbit'66 complaint = 'Perigee below surface of Moon!'67 with self._catch_and_log.consume_exceptions(topic):68 raise ValueError(complaint)69 self._expect(70 '%s: ValueError: %s at ' % (topic, complaint) +71 'File "/tests/unit/common_catch_and_log.py", line 86, in '72 'test_exception_suppressed\n raise ValueError(complaint)\n',73 catch_and_log._CRITICAL)74 self._assert_logs_match()75 def test_exception_propagates(self):76 topic = 'Entering Orbit'77 complaint = 'Perigee below surface of Moon!'78 with self.assertRaises(ValueError):79 with self._catch_and_log.propagate_exceptions(topic):80 raise ValueError(complaint)81 self._expect(82 '%s: ValueError: %s at ' % (topic, complaint) +83 'File "/tests/unit/common_catch_and_log.py", line 100, in '84 'test_exception_propagates\n raise ValueError(complaint)\n',85 catch_and_log._CRITICAL)86 self._assert_logs_match()87 def test_traceback_info_suppressed_in_production(self):88 appengine_config.PRODUCTION_MODE = True89 topic = 'Entering Orbit'90 complaint = 'Perigee below surface of Moon!'91 with self._catch_and_log.consume_exceptions(topic):92 raise ValueError(complaint)93 self._expect('%s: ValueError: %s' % (topic, complaint),94 catch_and_log._CRITICAL)95 self._assert_logs_match()96 def _expect(self, message, level):97 self._expected.append({98 'message': message,99 'level': level,100 'timestamp': datetime.datetime.now().strftime(101 catch_and_log._LOG_DATE_FORMAT)})102 def _assert_logs_match(self):103 if len(self._expected) != len(self._catch_and_log.get()):104 self.fail('Expected %d entries, but have %d' %105 (len(self._expected), len(self._catch_and_log.get())))106 for expected, actual in zip(self._expected, self._catch_and_log.get()):107 self.assertEquals(expected['level'], actual['level'])108 self.assertEquals(expected['message'], actual['message'])109 expected_time = datetime.datetime.strptime(110 expected['timestamp'], catch_and_log._LOG_DATE_FORMAT)...
Using AI Code Generation
1const { _expect } = require('@playwright/test');2const { expect } = require('chai');3const { expect } = require('@playwright/test');4const { expect } = require('jasmine');5const { expect } = require('mocha');6const { expect } = require('@playwright/test');7const { expect } = require('jasmine');8const { expect } = require('mocha');9const { expect } = require('@playwright/test');10const { expect } = require('jasmine');11const { expect } = require('mocha');12const { expect } = require('@playwright/test');13const { expect } = require('jasmine');14const { expect } = require('mocha');15const { expect } = require('@playwright/test');16const { expect } = require('jasmine');17const { expect } = require('mocha');18const { expect } = require('@playwright/test');19const { expect } = require('jasmine');20const { expect } = require('mocha');21const { expect } = require('@playwright/test');22const { expect } = require('jasmine');23const { expect } = require('mocha');
Using AI Code Generation
1const expect = require('expect-playwright');2const { test, expect } = require('@playwright/test');3test('first test', async ({ page }) => {4 const title = page.locator('text=Get started');5 await expect(title).toBeVisible();6});7const expect = require('expect-playwright');8const { test, expect } = require('@playwright/test');9test('first test', async ({ page }) => {10 const title = page.locator('text=Get started');11 await expect(title).toBeVisible();12});13const { test, expect } = require('@playwright/test');14const expect = require('expect-playwright');15const { test, expect } = require('@playwright/test');16const expect = require('expect-playwright');
Using AI Code Generation
1const { _expect } = require("@playwright/test");2const { expect } = require("@playwright/test");3const chai = require("chai");4const chaiExpect = chai.expect;5const jestExpect = require("expect");6const jasmineExpect = require("expect");7const mochaExpect = require("expect.js");8const cucumberExpect = require("chai").expect;9const jestExpect2 = require("expect");10const jasmineExpect2 = require("expect");11const mochaExpect2 = require("expect.js");12const cucumberExpect2 = require("chai").expect;13const jestExpect3 = require("expect");14const jasmineExpect3 = require("expect");15const mochaExpect3 = require("expect.js");16const cucumberExpect3 = require("chai").expect;17const jestExpect4 = require("expect");18const jasmineExpect4 = require("expect");19const mochaExpect4 = require("expect.js");20const cucumberExpect4 = require("chai").expect;21const jestExpect5 = require("expect");22const jasmineExpect5 = require("expect");23const mochaExpect5 = require("expect.js");24const cucumberExpect5 = require("chai").expect;25const jestExpect6 = require("expect");26const jasmineExpect6 = require("expect");27const mochaExpect6 = require("expect.js");28const cucumberExpect6 = require("chai").expect;29const jestExpect7 = require("expect");
Using AI Code Generation
1const expect = require('@playwright/test').expect;2test('should be true', async ({ page }) => {3 const title = await page.title();4 expect(title).toBe('Playwright');5});6const { test } = require('@playwright/test');7test('should be true', async ({ page }) => {8 const title = await page.title();9 expect(title).toBe('Playwright');10});11const { test, expect } = require('@playwright/test');12test('should be true', async ({ page }) => {13 const title = await page.title();14 expect(title).toBe('Playwright');15});16const { test, expect, config } = require('@playwright/test');17test('should be true', async ({ page }) => {18 const title = await page.title();19 expect(title).toBe('Playwright');20});21const { test, expect, config, params } = require('@playwright/test');22test('should be true', async ({ page }) => {
Using AI Code Generation
1const { expect } = require('@playwright/test');2const chai = require('chai');3const expectChai = chai.expect;4const expectJest = require('expect');5const expectJasmine = require('jasmine');6const expectMocha = require('mocha');7const expectJest = require('expect');8const expectJasmine = require('jasmine');9const expectMocha = require('mocha');10const expectJest = require('expect');11const expectJasmine = require('jasmine');12const expectMocha = require('mocha');13const expectJest = require('expect');14const expectJasmine = require('jasmine');15const expectMocha = require('mocha');16const expectJest = require('expect');
Using AI Code Generation
1const { _expect } = require('@playwright/test');2const { expect } = require('chai');3const { expect: expect2 } = require('chai');4const { expect: expect3 } = require('chai');5const { expect: expect4 } = require('chai');6const { expect: expect5 } = require('chai');7const { expect: expect6 } = require('chai');8const { expect: expect7 } = require('chai');9const { expect: expect8 } = require('chai');10const { expect: expect9 } = require('chai');11const { expect: expect10 } = require('chai');12const { expect: expect11 } = require('chai');13const { expect: expect12 } = require('chai');14const { expect: expect13 } = require('chai');15const { expect: expect14 } = require('chai');16const { expect: expect15 } = require('chai');17const { expect: expect16 } = require('chai');18const { expect: expect17 } = require('chai');19const { expect: expect18 } = require('chai');20const { expect: expect19 } = require('chai');21const { expect: expect20 } = require('chai');22const { expect: expect21 } = require('chai');23const { expect: expect22 } = require('chai');24const { expect: expect23 } = require('chai');
Using AI Code Generation
1const { _expect } = require('playwright');2const { expect } = require('chai');3test('Test 1', async ({ page }) => {4 await _expect(page).toHaveTitle('Playwright');5 expect(await page.title()).to.be.equal('Playwright');6});7test('Test 2', async ({ page }) => {8 await _expect(page).toHaveTitle('Playwright');9 expect(await page.title()).to.be.equal('Playwright');10});11test('Test 3', async ({ page }) => {12 await _expect(page).toHaveTitle('Playwright');13 expect(await page.title()).to.be.equal('Playwright');14});15test('Test 4', async ({ page }) => {16 await _expect(page).toHaveTitle('Playwright');17 expect(await page.title()).to.be.equal('Playwright');18});19test('Test 5', async ({ page }) => {20 await _expect(page).toHaveTitle('Playwright');21 expect(await page.title()).to.be.equal('Playwright');22});23test('Test 6', async ({ page }) => {24 await _expect(page).toHaveTitle('Playwright');25 expect(await page.title()).to.be.equal('Playwright');26});27test('Test 7', async ({ page }) => {
Using AI Code Generation
1const { _expect } = require("playwright");2const { expect } = require("jest");3test("Test to check if the page title is correct", async () => {4 const browser = await chromium.launch();5 const page = await browser.newPage();6 const pageTitle = await page.title();7 await browser.close();8 await expect(pageTitle).toBe("The Internet");9});10test("Test to check if the page title is correct", async () => {11 const browser = await chromium.launch();12 const page = await browser.newPage();13 const pageTitle = await page.title();14 await browser.close();15 await _expect(pageTitle).toBe("The Internet");16});
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!