How to use test_address method in Nose

Best Python code snippet using nose

testgriddaemons.py

Source:testgriddaemons.py Github

copy

Full Screen

1#!/usr/bin/python2# -*- coding: utf-8 -*-3#4# --- BEGIN_HEADER ---5#6# testgriddaemons- Set of unit tests for grid daemon helper functions7# Copyright (C) 2010-2020 The MiG Project lead by Brian Vinter8#9# This file is part of MiG.10#11# MiG is free software: you can redistribute it and/or modify12# it under the terms of the GNU General Public License as published by13# the Free Software Foundation; either version 2 of the License, or14# (at your option) any later version.15#16# MiG is distributed in the hope that it will be useful,17# but WITHOUT ANY WARRANTY; without even the implied warranty of18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the19# GNU General Public License for more details.20#21# You should have received a copy of the GNU General Public License22# along with this program; if not, write to the Free Software23# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.24#25# -- END_HEADER ---26#27"""Unit tests for grid daemon helper functions"""28from __future__ import print_function29import sys30import time31import logging32from mig.shared.griddaemons.ratelimits import default_max_user_hits, \33 expire_rate_limit, hit_rate_limit, update_rate_limit34from mig.shared.griddaemons.sessions import active_sessions, \35 clear_sessions, get_active_session, get_open_sessions, \36 track_open_session, track_close_session, track_close_expired_sessions37# TODO: Add unit test for validate_auth_attempt ?38if __name__ == "__main__":39 from mig.shared.conf import get_configuration_object40 conf = get_configuration_object()41 logging.basicConfig(filename=None, level=logging.INFO,42 format="%(asctime)s %(levelname)s %(message)s")43 conf.logger = logging44 test_proto, test_address, test_port, test_id, test_session_id = \45 'DUMMY', '127.0.0.42', 42000, \46 'user@some-domain.org', 'DUMMY_SESSION_ID'47 test_pw = "T3stp4ss"48 invalid_id = 'root'49 print("Running unit test on rate limit functions")50 print("Force expire all")51 expired = expire_rate_limit(conf, test_proto, fail_cache=0)52 print("Expired: %s" % expired)53 this_pw = test_pw54 print("Emulate rate limit")55 for i in range(default_max_user_hits-1):56 hit = hit_rate_limit(conf, test_proto, test_address, test_id)57 print("Blocked: %s" % hit)58 update_rate_limit(conf, test_proto, test_address, test_id, False,59 this_pw)60 print("Updated fail for %s:%s from %s" % \61 (test_id, this_pw, test_address))62 this_pw += 'x'63 time.sleep(1)64 hit = hit_rate_limit(conf, test_proto, test_address, test_id)65 print("Blocked: %s" % hit)66 print("Check with original user and password again")67 update_rate_limit(conf, test_proto, test_address, test_id, False, test_pw)68 hit = hit_rate_limit(conf, test_proto, test_address, test_id)69 print("Blocked: %s" % hit)70 print("Check with original user and new password again to hit limit")71 update_rate_limit(conf, test_proto, test_address, test_id, False, this_pw)72 hit = hit_rate_limit(conf, test_proto, test_address, test_id)73 print("Blocked: %s" % hit)74 other_proto, other_address = "BOGUS", '127.10.20.30'75 other_id, other_pw = 'other@some.org', "0th3rP4ss"76 print("Update with other proto")77 update_rate_limit(conf, other_proto, test_address, test_id, False, test_pw)78 print("Update with other address")79 update_rate_limit(conf, test_proto, other_address, test_id, False, test_pw)80 print("Update with other user")81 update_rate_limit(conf, test_proto, test_address, other_id, False, test_pw)82 print("Check with same user from other address")83 hit = hit_rate_limit(conf, test_proto, other_address, test_id)84 print("Blocked: %s" % hit)85 print("Check with other user from same address")86 hit = hit_rate_limit(conf, test_proto, test_address, other_id)87 print("Blocked: %s" % hit)88 time.sleep(2)89 print("Force expire some entries")90 expired = expire_rate_limit(conf, test_proto,91 fail_cache=default_max_user_hits)92 print("Expired: %s" % expired)93 print("Test reset on success")94 hit = hit_rate_limit(conf, test_proto, test_address, test_id)95 print("Blocked: %s" % hit)96 update_rate_limit(conf, test_proto, test_address, test_id, True, test_pw)97 print("Updated success for %s from %s" % (test_id, test_address))98 hit = hit_rate_limit(conf, test_proto, test_address, test_id)99 print("Blocked: %s" % hit)100 print("Check with same user from other address")101 hit = hit_rate_limit(conf, test_proto, other_address, test_id)102 print("Blocked: %s" % hit)103 print("Check with other user from same address")104 hit = hit_rate_limit(conf, test_proto, test_address, other_id)105 print("Blocked: %s" % hit)106 print("Check with invalid user from same address")107 hit = hit_rate_limit(conf, test_proto, test_address, invalid_id)108 print("Blocked: %s" % hit)109 print("Test active session counting")110 active_count = active_sessions(conf, test_proto, test_id)111 print("Open sessions: %d" % active_count)112 print("Clear sessions")113 clear_sessions(conf, test_proto)114 active_count = active_sessions(conf, test_proto, test_id)115 print("Open sessions: %d" % active_count)116 print("Track open session")117 track_open_session(conf, test_proto, test_id, test_address, test_port)118 active_count = active_sessions(conf, test_proto, test_id)119 print("Open sessions: %d" % active_count)120 print("Track open session")121 track_open_session(conf, test_proto, test_id, test_address, test_port+1)122 active_count = active_sessions(conf, test_proto, test_id)123 print("Open sessions: %d" % active_count)124 print("Track close session")125 track_close_session(conf, test_proto, test_id, test_address, test_port, )126 active_count = active_sessions(conf, test_proto, test_id)127 print("Open sessions: %d" % active_count)128 print("Track close session")129 track_close_session(conf, test_proto, test_id, test_address, test_port+1, )130 active_count = active_sessions(conf, test_proto, test_id)131 print("Open sessions: %d" % active_count)132 print("Test session tracking functions")133 expected_session_keys = ['ip_addr',134 'tcp_port',135 'session_id',136 'authorized',137 'client_id',138 'timestamp']139 print("Track open session #1")140 open_session = track_open_session(conf,141 test_proto,142 test_id,143 test_address,144 test_port,145 test_session_id,146 authorized=True)147 active_count = active_sessions(conf, test_proto, test_id)148 if active_count != 1:149 print("ERROR: Excpected 1 active session(s) for: %s, found: %d" \150 % (test_id, active_count))151 sys.exit(1)152 active_count = active_sessions(conf, test_proto, test_id+"_1")153 if active_count != 0:154 print("ERROR: Excpected 0 active session(s) for: %s, found: %d" \155 % (test_id+"_1", active_count))156 sys.exit(1)157 if isinstance(open_session, dict):158 if open_session.keys() == expected_session_keys:159 print("OK")160 else:161 print("ERROR: Invalid session dictionary: '%s'" \162 % (open_session))163 sys.exit(1)164 else:165 print("ERROR: Expected dictionary: %s" % type(open_session))166 sys.exit(1)167 print("Track open session #2")168 open_session = track_open_session(conf,169 test_proto,170 test_id,171 test_address,172 test_port+1,173 test_session_id+"_1",174 authorized=True)175 active_count = active_sessions(conf, test_proto, test_id)176 if active_count != 2:177 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \178 % (test_id, active_count))179 sys.exit(1)180 active_count = active_sessions(conf, test_proto, test_id+"_1")181 if active_count != 0:182 print("ERROR: Excpected 0 active session(s) for: %s, found: %d" \183 % (test_id+"_1", active_count))184 sys.exit(1)185 if isinstance(open_session, dict):186 if open_session.keys() == expected_session_keys:187 print("OK")188 else:189 print("ERROR: Invalid session dictionary: '%s'" \190 % (open_session))191 sys.exit(1)192 else:193 print("ERROR: Expected dictionary: %s" % type(open_session))194 sys.exit(1)195 print("Track open session #3")196 open_session = track_open_session(conf,197 test_proto,198 test_id+"_1",199 test_address,200 test_port+2,201 test_session_id+"_2",202 authorized=True)203 active_count = active_sessions(conf, test_proto, test_id)204 if active_count != 2:205 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \206 % (test_id, active_count))207 sys.exit(1)208 active_count = active_sessions(conf, test_proto, test_id+"_1")209 if active_count != 1:210 print("ERROR: Excpected 1 active session(s) for: %s, found: %d" \211 % (test_id+"_1", active_count))212 sys.exit(1)213 if isinstance(open_session, dict):214 if open_session.keys() == expected_session_keys:215 print("OK")216 else:217 print("ERROR: Invalid session dictionary: '%s'" \218 % (open_session))219 sys.exit(1)220 else:221 print("ERROR: Expected dictionary: %s" % type(open_session))222 sys.exit(1)223 print("Track open session #4")224 open_session = track_open_session(conf,225 test_proto,226 test_id+"_1",227 test_address,228 test_port+3,229 test_session_id+"_3",230 authorized=True)231 active_count = active_sessions(conf, test_proto, test_id)232 if active_count != 2:233 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \234 % (test_id, active_count))235 sys.exit(1)236 active_count = active_sessions(conf, test_proto, test_id+"_1")237 if active_count != 2:238 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \239 % (test_id+"_1", active_count))240 sys.exit(1)241 if isinstance(open_session, dict):242 if open_session.keys() == expected_session_keys:243 print("OK")244 else:245 print("ERROR: Invalid session dictionary: '%s'" \246 % (open_session))247 sys.exit(1)248 else:249 print("ERROR: Expected dictionary: %s" % type(open_session))250 print("Track get open sessions #1")251 cur_open_sessions = get_open_sessions(conf, 'INVALID')252 if isinstance(cur_open_sessions, dict):253 if not cur_open_sessions:254 print("OK")255 else:256 print("ERROR: Excpected empty dictionary: %s" \257 % cur_open_sessions)258 sys.exit(1)259 else:260 print("ERROR: Expected dictionary: %s" % type(cur_open_sessions))261 sys.exit(1)262 print("Track get open sessions #2")263 cur_open_sessions = get_open_sessions(conf, test_proto, 'INVALID')264 if isinstance(cur_open_sessions, dict):265 if not cur_open_sessions:266 print("OK")267 else:268 print("ERROR: Excpected empty dictionary: %s" \269 % cur_open_sessions)270 sys.exit(1)271 else:272 print("ERROR: Expected dictionary: %s" % type(cur_open_sessions))273 sys.exit(1)274 print("Track get open sessions #3")275 cur_open_sessions = get_open_sessions(conf, test_proto)276 if isinstance(cur_open_sessions, dict):277 if len(cur_open_sessions.keys()) != 4:278 print("ERROR: Expected dictionary #keys: 4" \279 + ", found: %s, %s" % (len(cur_open_sessions.keys()),280 cur_open_sessions.keys()))281 sys.exit(1)282 status = True283 for (key, val) in cur_open_sessions.iteritems():284 if not isinstance(val, dict) \285 or val.keys() != expected_session_keys:286 status = False287 print("ERROR: Invalid session dictionary: '%s'" \288 % (val))289 sys.exit(1)290 if status:291 print("OK")292 else:293 print("ERROR: Expected dictionary: %s" % type(cur_open_sessions))294 sys.exit(1)295 print("Track get open sessions #4")296 cur_open_sessions = get_open_sessions(conf,297 test_proto,298 client_id=test_id)299 if isinstance(cur_open_sessions, dict):300 if len(cur_open_sessions.keys()) != 2:301 print("ERROR: Expected dictionary #keys: 2" \302 + ", found: %s, %s" % (len(cur_open_sessions.keys()),303 cur_open_sessions.keys()))304 sys.exit(1)305 status = True306 for (key, val) in cur_open_sessions.iteritems():307 if not isinstance(val, dict) \308 or val.keys() != expected_session_keys:309 status = False310 print("ERROR: Invalid session dictionary: '%s'" \311 % (val))312 sys.exit(1)313 if status:314 print("OK")315 else:316 print("ERROR: Expected dictionary: %s" % type(cur_open_sessions))317 sys.exit(1)318 print("Track get active session #1")319 active_session = get_active_session(conf,320 'INVALID',321 test_id,322 test_session_id)323 if isinstance(active_session, dict):324 if not active_session:325 print("OK")326 else:327 print("ERROR: Excpected empty dictionary: %s" \328 % active_session)329 sys.exit(1)330 else:331 print("ERROR: Expected dictionary: %s" % type(active_session))332 sys.exit(1)333 print("Track get active session #2")334 active_session = get_active_session(conf,335 test_proto,336 'INVALID',337 test_session_id)338 if isinstance(active_session, dict):339 if not active_session:340 print("OK")341 else:342 print("ERROR: Excpected empty dictionary: %s" \343 % active_session)344 sys.exit(1)345 else:346 print("ERROR: Expected dictionary: %s" % type(active_session))347 sys.exit(1)348 print("Track get active session #3")349 active_session = get_active_session(conf,350 test_proto,351 test_id,352 'INVALID')353 if isinstance(active_session, dict):354 if not active_session:355 print("OK")356 else:357 print("ERROR: Excpected empty dictionary: %s" \358 % active_session)359 sys.exit(1)360 else:361 print("ERROR: Expected dictionary: %s" % type(active_session))362 sys.exit(1)363 print("Track get active session #4")364 active_session = get_active_session(conf,365 test_proto,366 test_id,367 test_session_id)368 if isinstance(active_session, dict):369 if active_session.keys() == expected_session_keys:370 print("OK")371 else:372 print("ERROR: Invalid session dictionary: '%s'" \373 % (active_session))374 sys.exit(1)375 else:376 print("ERROR: Expected dictionary: %s" % type(active_session))377 sys.exit(1)378 print("Track close session #1")379 close_session = track_close_session(conf,380 'INVALID',381 test_id,382 test_address,383 test_port,384 session_id=test_session_id)385 active_count = active_sessions(conf, test_proto, test_id)386 if active_count != 2:387 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \388 % (test_id, active_count))389 sys.exit(1)390 active_count = active_sessions(conf, test_proto, test_id+"_1")391 if active_count != 2:392 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \393 % (test_id+"_1", active_count))394 sys.exit(1)395 if isinstance(close_session, dict):396 if not close_session:397 print("OK")398 else:399 print("ERROR: Excpected empty dictionary: %s" \400 % close_session)401 sys.exit(1)402 else:403 print("ERROR: Expected dictionary: %s" % type(close_session))404 sys.exit(1)405 print("Track close session #2")406 close_session = track_close_session(conf,407 test_proto,408 'INVALID',409 test_address,410 test_port,411 session_id=test_session_id)412 active_count = active_sessions(conf, test_proto, test_id)413 if active_count != 2:414 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \415 % (test_id, active_count))416 sys.exit(1)417 active_count = active_sessions(conf, test_proto, test_id+"_1")418 if active_count != 2:419 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \420 % (test_id+"_1", active_count))421 sys.exit(1)422 if isinstance(close_session, dict):423 if not close_session:424 print("OK")425 else:426 print("ERROR: Excpected empty dictionary: %s" \427 % close_session)428 sys.exit(1)429 else:430 print("ERROR: Expected dictionary: %s" % type(close_session))431 sys.exit(1)432 print("Track close session #3")433 close_session = track_close_session(conf,434 test_proto,435 test_id,436 test_address,437 test_port,438 session_id=None)439 active_count = active_sessions(conf, test_proto, test_id)440 if active_count != 2:441 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \442 % (test_id, active_count))443 sys.exit(1)444 active_count = active_sessions(conf, test_proto, test_id+"_1")445 if active_count != 2:446 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \447 % (test_id+"_1", active_count))448 sys.exit(1)449 if isinstance(close_session, dict):450 if not close_session:451 print("OK")452 else:453 print("ERROR: Excpected empty dictionary: %s" \454 % close_session)455 sys.exit(1)456 else:457 print("ERROR: Expected dictionary: %s" % type(close_session))458 sys.exit(1)459 print("Track close session #4")460 close_session = track_close_session(conf,461 test_proto,462 test_id,463 test_address,464 test_port,465 session_id=test_session_id)466 active_count = active_sessions(conf, test_proto, test_id)467 if active_count != 1:468 print("ERROR: Excpected 1 active session(s) for: %s, found: %d" \469 % (test_id, active_count))470 sys.exit(1)471 active_count = active_sessions(conf, test_proto, test_id+"_1")472 if active_count != 2:473 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \474 % (test_id+"_1", active_count))475 sys.exit(1)476 if isinstance(close_session, dict):477 if close_session.keys() == expected_session_keys:478 print("OK")479 else:480 print("ERROR: Invalid session dictionary: '%s'" \481 % (close_session))482 sys.exit(1)483 else:484 print("ERROR: Expected dictionary: %s" % type(close_session))485 sys.exit(1)486 print("Track close expired sessions #1")487 expired_sessions = track_close_expired_sessions(conf,488 'INVALID')489 active_count = active_sessions(conf, test_proto, test_id)490 if active_count != 1:491 print("ERROR: Excpected 1 active session(s) for: %s, found: %d" \492 % (test_id, active_count))493 sys.exit(1)494 active_count = active_sessions(conf, test_proto, test_id+"_1")495 if active_count != 2:496 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \497 % (test_id+"_1", active_count))498 sys.exit(1)499 if isinstance(expired_sessions, dict):500 if not expired_sessions:501 print("OK")502 else:503 print("ERROR: Excpected empty dictionary: %s" \504 % expired_sessions)505 sys.exit(1)506 else:507 print("ERROR: Expected dictionary: %s" % type(expired_sessions))508 sys.exit(1)509 print("Track close expired sessions #2")510 expired_sessions = track_close_expired_sessions(conf,511 test_proto,512 'INVALID')513 active_count = active_sessions(conf, test_proto, test_id)514 if active_count != 1:515 print("ERROR: Excpected 1 active session(s) for: %s, found: %d" \516 % (test_id, active_count))517 sys.exit(1)518 active_count = active_sessions(conf, test_proto, test_id+"_1")519 if active_count != 2:520 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \521 % (test_id+"_1", active_count))522 sys.exit(1)523 if isinstance(expired_sessions, dict):524 if not expired_sessions:525 print("OK")526 else:527 print("ERROR: Excpected empty dictionary: %s" \528 % expired_sessions)529 sys.exit(1)530 else:531 print("ERROR: Expected dictionary: %s" % type(expired_sessions))532 sys.exit(1)533 print("Track close expired sessions #3")534 expired_sessions = track_close_expired_sessions(conf,535 test_proto,536 client_id=test_id)537 active_count = active_sessions(conf, test_proto, test_id)538 if active_count != 0:539 print("ERROR: Excpected 0 active session(s) for: %s, found: %d" \540 % (test_id, active_count))541 sys.exit(1)542 active_count = active_sessions(conf, test_proto, test_id+"_1")543 if active_count != 2:544 print("ERROR: Excpected 2 active session(s) for: %s, found: %d" \545 % (test_id+"_1", active_count))546 sys.exit(1)547 if isinstance(expired_sessions, dict):548 if len(expired_sessions.keys()) == 1:549 status = True550 for (key, val) in expired_sessions.iteritems():551 if not isinstance(val, dict) \552 or val.keys() != expected_session_keys:553 status = False554 print("ERROR: Invalid session dictionary: '%s'" \555 % (val))556 sys.exit(1)557 if status:558 print("OK")559 else:560 print("ERROR: Expected 1 expired session, found: %s" \561 % len(expired_sessions.keys()))562 sys.exit(1)563 else:564 print("ERROR: Expected dictionary: %s" % type(expired_sessions))565 sys.exit(1)566 print("Track close expired sessions #4")567 expired_sessions = track_close_expired_sessions(conf,568 test_proto)569 active_count = active_sessions(conf, test_proto, test_id)570 if active_count != 0:571 print("ERROR: Excpected 0 active session(s) for: %s, found: %d" \572 % (test_id, active_count))573 sys.exit(1)574 active_count = active_sessions(conf, test_proto, test_id+"_1")575 if active_count != 0:576 print("ERROR: Excpected 0 active session(s) for: %s, found: %d" \577 % (test_id+"_1", active_count))578 sys.exit(1)579 if isinstance(expired_sessions, dict):580 if len(expired_sessions.keys()) == 2:581 status = True582 for (key, val) in expired_sessions.iteritems():583 if not isinstance(val, dict) \584 or val.keys() != expected_session_keys:585 status = False586 print("ERROR: Invalid session dictionary: '%s'" \587 % (val))588 sys.exit(1)589 if status:590 print("OK")591 else:592 print("ERROR: Expected 2 expired session, found: %s" \593 % len(expired_sessions.keys()))594 sys.exit(1)595 else:596 print("ERROR: Expected dictionary: %s" % type(expired_sessions))...

Full Screen

Full Screen

address_tests.py

Source:address_tests.py Github

copy

Full Screen

1#***2#* GroceryDirect - Address Tests3#* 4#* Author: Melanie Cornelius, mseryn5#* Written for CS 425-02 Fall 2016 Final Project6#*7#* Addresses:8#* have:9#* -- street10#* -- apt (optional)11#* -- city12#* -- state13#* -- zip code14#* -- type15#* get:16#* -- street17#* -- apt (if exists)18#* -- city19#* -- state20#* -- zip code21#* -- full address string?22#* -- type23#* modify:24#* -- street25#* -- apt26#* -- city27#* -- state28#* -- zip code29#* -- type30#***31import address32import person33# Default strings for tests34TEST_STREET = "123 Test St."35TEST_APT_NO = "Test Apt No. 1"36TEST_CITY = "Test City"37TEST_STATE_CODE = "AZ"38TEST_ZIP_CODE = 1234539TEST_TYPE_STRING = "shipping"40# Default strings for test person41FNAME = "FirstName"42LNAME = "LastName"43MIDDLE_INIT = "M" 44USERNAME = "testaccount"45PASSWORD = "testpassword"46# Test objects (avoid cluttering db)47CUSTOMER = person.check_credentials(USERNAME, PASSWORD)48if not CUSTOMER:49 CUSTOMER = person.Person.new_person(USERNAME, PASSWORD, FNAME, LNAME, "customer", \50 middle_initial = MIDDLE_INIT)51#**************************************************************************************************52#** ADDRESS: PERSON53#**************************************************************************************************54def test_address_get_null_person():55 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \56 TEST_TYPE_STRING)57 assert(test_address.get_person().get_id() == None), "get_person did not correctly return \58 null for default for address"59def test_address_add_person():60 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \61 TEST_TYPE_STRING)62 test_address.add_person(CUSTOMER)63 assert(test_address.get_person().get_id() == CUSTOMER.get_id()), "get and add person did \64 not correctly function for address"65#**************************************************************************************************66#** ADDRESS: DEFAULT FLAG67#**************************************************************************************************68def test_address_get_default_flag():69 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \70 TEST_TYPE_STRING)71 assert(test_address.get_default_flag() == False), \72 "default flag did not return False for unmodified address"73def test_address_set_default():74 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \75 TEST_TYPE_STRING)76 test_address.set_default_flag(True)77 assert(test_address.get_default_flag() == True), \78 "set default flag failed to modify flag to True for address without person"79def test_address_remove_default():80 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \81 TEST_TYPE_STRING)82 test_address.set_default_flag(True)83 test_address.set_default_flag(False)84 assert(test_address.get_default_flag() == False), \85 "set default flag failed to modify flag to False for address without person"86#**************************************************************************************************87#** ADDRESS: STREET88#**************************************************************************************************89def test_address_get_street():90 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \91 TEST_TYPE_STRING)92 assert(test_address.get_street() == TEST_STREET), \93 "get_street() did not return street name for address"94def test_address_modify_street():95 new_street_string = "321 New Test Street"96 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \97 TEST_TYPE_STRING)98 test_address.modify_street(new_street_string)99 assert(test_address.get_street() == new_street_string), \100 "modify_street() did not modify and return street name for address"101#**************************************************************************************************102#** ADDRESS: APARTPEMNT NUMBER103#**************************************************************************************************104def test_address_get_apt():105 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \106 TEST_TYPE_STRING, apt_no = TEST_APT_NO)107 assert(test_address.get_apartment_no() == TEST_APT_NO), \108 "get_apartment_no() did not return apartment no. line for address"109def test_address_get_apt_none():110 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \111 TEST_TYPE_STRING)112 assert(test_address.get_apartment_no() == ""), \113 "get_apartment_no() did not return None for nonexistant apartment no. line for address"114def test_address_modify_apt():115 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \116 TEST_TYPE_STRING)117 test_address.modify_apartment_no(TEST_APT_NO)118 assert(test_address.get_apartment_no() == TEST_APT_NO), \119 "modify_apartment_no() did not modify and return apartment no. line for address"120#**************************************************************************************************121#** ADDRESS: APARTPEMNT NUMBER122#**************************************************************************************************123def test_address_get_city():124 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \125 TEST_TYPE_STRING)126 assert(test_address.get_city() == TEST_CITY), "get_city() did not return city for address"127def test_address_modify_city():128 new_city_string = "New Test City"129 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \130 TEST_TYPE_STRING)131 test_address.modify_city(new_city_string)132 assert(test_address.get_city() == new_city_string), \133 "modify_city() did not modify and return city for address"134#**************************************************************************************************135#** ADDRESS: ZIP_CODE136#**************************************************************************************************137def test_address_get_zip_code():138 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \139 TEST_TYPE_STRING)140 assert(test_address.get_zip_code() == TEST_ZIP_CODE), \141 "get_zip_code() did not return zipcode for address"142def test_address_modify_zip_code():143 new_zipcode = 99999144 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \145 TEST_TYPE_STRING)146 test_address.modify_zip_code(new_zipcode)147 assert(test_address.get_zip_code() == new_zipcode), \148 "modify_zip_code() did not modify and return zipcode for address"149def test_address_modify_zip_invalid_type():150 new_zipcode = "invalid string"151 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \152 TEST_TYPE_STRING)153 test_address.modify_zip_code(new_zipcode)154 assert(test_address.get_zip_code() == TEST_ZIP_CODE), \155 "modify_zip_code() did not appropriatly handle invalid zip code for address"156def test_address_modify_zip_invalid_number():157 new_zipcode = 999999158 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \159 TEST_TYPE_STRING)160 test_address.modify_zip_code(new_zipcode)161 assert(test_address.get_zip_code() == TEST_ZIP_CODE), \162 "modify_zip_code() did not appropriatly handle invalid numeric zip code for address"163#**************************************************************************************************164#** ADDRESS: STATE165#**************************************************************************************************166def test_address_get_state():167 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \168 TEST_TYPE_STRING)169 assert(test_address.get_state() == TEST_STATE_CODE), \170 "get_state() did not return state for address"171def test_address_modify_state():172 new_state_code = "MN"173 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \174 TEST_TYPE_STRING)175 test_address.modify_state(new_state_code)176 assert(test_address.get_state() == new_state_code), \177 "modify_state() did not modify and return state for address"178def test_address_modify_state_invalid():179 new_state_code = "LL"180 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \181 TEST_TYPE_STRING)182 test_address.modify_state(new_state_code)183 assert(test_address.get_state() == TEST_STATE_CODE), \184 "modify_state() did not appropriatly handle invalid state code for address"185#**************************************************************************************************186#** ADDRESS: TYPE187#**************************************************************************************************188def test_address_get_type():189 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \190 TEST_TYPE_STRING)191 assert(test_address.get_type() == TEST_TYPE_STRING), \192 "get_type() did not return type for address"193def test_address_modify_type_to_warehouse():194 new_type_string = "warehouse"195 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \196 TEST_TYPE_STRING)197 test_address.modify_type(new_type_string)198 assert(test_address.get_type() == new_type_string), \199 "modify_type() did not modify and return type for change to %s type address" \200 %(new_type_string)201def test_address_modify_type_to_billing():202 new_type_string = "billing"203 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \204 TEST_TYPE_STRING)205 test_address.modify_type(new_type_string)206 assert(test_address.get_type() == new_type_string), \207 "modify_type() did not modify and return type for change to %s type address" \208 %(new_type_string)209def test_address_modify_type_to_supplier():210 new_type_string = "supplier"211 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \212 TEST_TYPE_STRING)213 test_address.modify_type(new_type_string)214 assert(test_address.get_type() == new_type_string), \215 "modify_type() did not modify and return type for change to %s type address" \216 %(new_type_string)217def test_address_modify_type_invalid():218 new_type_string = "invalid string"219 test_address = address.Address.new_address(TEST_STREET, TEST_CITY, TEST_STATE_CODE, TEST_ZIP_CODE, \220 TEST_TYPE_STRING)221 test_address.modify_type(new_type_string)222 assert(test_address.get_type() == TEST_TYPE_STRING), \...

Full Screen

Full Screen

test_services.py

Source:test_services.py Github

copy

Full Screen

1"""Test KNX services."""2import pytest3from xknx.telegram.apci import GroupValueResponse, GroupValueWrite4from homeassistant.const import STATE_OFF, STATE_ON5from homeassistant.core import HomeAssistant6from .conftest import KNXTestKit7from tests.common import async_capture_events8@pytest.mark.parametrize(9 "service_payload,expected_telegrams,expected_apci",10 [11 # send DPT 1 telegram12 (13 {"address": "1/2/3", "payload": True, "response": True},14 [("1/2/3", True)],15 GroupValueResponse,16 ),17 (18 {"address": "1/2/3", "payload": True, "response": False},19 [("1/2/3", True)],20 GroupValueWrite,21 ),22 # send DPT 5 telegram23 (24 {"address": "1/2/3", "payload": [99], "response": True},25 [("1/2/3", (99,))],26 GroupValueResponse,27 ),28 (29 {"address": "1/2/3", "payload": [99], "response": False},30 [("1/2/3", (99,))],31 GroupValueWrite,32 ),33 # send DPT 5 percent telegram34 (35 {"address": "1/2/3", "payload": 99, "type": "percent", "response": True},36 [("1/2/3", (0xFC,))],37 GroupValueResponse,38 ),39 (40 {"address": "1/2/3", "payload": 99, "type": "percent", "response": False},41 [("1/2/3", (0xFC,))],42 GroupValueWrite,43 ),44 # send temperature DPT 9 telegram45 (46 {47 "address": "1/2/3",48 "payload": 21.0,49 "type": "temperature",50 "response": True,51 },52 [("1/2/3", (0x0C, 0x1A))],53 GroupValueResponse,54 ),55 (56 {57 "address": "1/2/3",58 "payload": 21.0,59 "type": "temperature",60 "response": False,61 },62 [("1/2/3", (0x0C, 0x1A))],63 GroupValueWrite,64 ),65 # send multiple telegrams66 (67 {68 "address": ["1/2/3", "2/2/2", "3/3/3"],69 "payload": 99,70 "type": "percent",71 "response": True,72 },73 [74 ("1/2/3", (0xFC,)),75 ("2/2/2", (0xFC,)),76 ("3/3/3", (0xFC,)),77 ],78 GroupValueResponse,79 ),80 (81 {82 "address": ["1/2/3", "2/2/2", "3/3/3"],83 "payload": 99,84 "type": "percent",85 "response": False,86 },87 [88 ("1/2/3", (0xFC,)),89 ("2/2/2", (0xFC,)),90 ("3/3/3", (0xFC,)),91 ],92 GroupValueWrite,93 ),94 ],95)96async def test_send(97 hass: HomeAssistant,98 knx: KNXTestKit,99 service_payload,100 expected_telegrams,101 expected_apci,102):103 """Test `knx.send` service."""104 await knx.setup_integration({})105 await hass.services.async_call(106 "knx",107 "send",108 service_payload,109 blocking=True,110 )111 for expected_response in expected_telegrams:112 group_address, payload = expected_response113 await knx.assert_telegram(group_address, payload, expected_apci)114async def test_read(hass: HomeAssistant, knx: KNXTestKit):115 """Test `knx.read` service."""116 await knx.setup_integration({})117 # send read telegram118 await hass.services.async_call("knx", "read", {"address": "1/1/1"}, blocking=True)119 await knx.assert_read("1/1/1")120 # send multiple read telegrams121 await hass.services.async_call(122 "knx",123 "read",124 {"address": ["1/1/1", "2/2/2", "3/3/3"]},125 blocking=True,126 )127 await knx.assert_read("1/1/1")128 await knx.assert_read("2/2/2")129 await knx.assert_read("3/3/3")130async def test_event_register(hass: HomeAssistant, knx: KNXTestKit):131 """Test `knx.event_register` service."""132 events = async_capture_events(hass, "knx_event")133 test_address = "1/2/3"134 await knx.setup_integration({})135 # no event registered136 await knx.receive_write(test_address, True)137 await hass.async_block_till_done()138 assert len(events) == 0139 # register event with `type`140 await hass.services.async_call(141 "knx",142 "event_register",143 {"address": test_address, "type": "2byte_unsigned"},144 blocking=True,145 )146 await knx.receive_write(test_address, (0x04, 0xD2))147 await hass.async_block_till_done()148 assert len(events) == 1149 typed_event = events.pop()150 assert typed_event.data["data"] == (0x04, 0xD2)151 assert typed_event.data["value"] == 1234152 # remove event registration - no event added153 await hass.services.async_call(154 "knx",155 "event_register",156 {"address": test_address, "remove": True},157 blocking=True,158 )159 await knx.receive_write(test_address, True)160 await hass.async_block_till_done()161 assert len(events) == 0162 # register event without `type`163 await hass.services.async_call(164 "knx", "event_register", {"address": test_address}, blocking=True165 )166 await knx.receive_write(test_address, True)167 await knx.receive_write(test_address, False)168 await hass.async_block_till_done()169 assert len(events) == 2170 untyped_event_2 = events.pop()171 assert untyped_event_2.data["data"] is False172 assert untyped_event_2.data["value"] is None173 untyped_event_1 = events.pop()174 assert untyped_event_1.data["data"] is True175 assert untyped_event_1.data["value"] is None176async def test_exposure_register(hass: HomeAssistant, knx: KNXTestKit):177 """Test `knx.exposure_register` service."""178 test_address = "1/2/3"179 test_entity = "fake.entity"180 test_attribute = "fake_attribute"181 await knx.setup_integration({})182 # no exposure registered183 hass.states.async_set(test_entity, STATE_ON, {})184 await knx.assert_no_telegram()185 # register exposure186 await hass.services.async_call(187 "knx",188 "exposure_register",189 {"address": test_address, "entity_id": test_entity, "type": "binary"},190 blocking=True,191 )192 hass.states.async_set(test_entity, STATE_OFF, {})193 await knx.assert_write(test_address, False)194 # register exposure195 await hass.services.async_call(196 "knx",197 "exposure_register",198 {"address": test_address, "remove": True},199 blocking=True,200 )201 hass.states.async_set(test_entity, STATE_ON, {})202 await knx.assert_no_telegram()203 # register exposure for attribute with default204 await hass.services.async_call(205 "knx",206 "exposure_register",207 {208 "address": test_address,209 "entity_id": test_entity,210 "attribute": test_attribute,211 "type": "percentU8",212 "default": 0,213 },214 blocking=True,215 )216 # no attribute on first change wouldn't work because no attribute change since last test217 hass.states.async_set(test_entity, STATE_ON, {test_attribute: 30})218 await knx.assert_write(test_address, (30,))219 hass.states.async_set(test_entity, STATE_OFF, {})220 await knx.assert_write(test_address, (0,))221 # don't send same value sequentially222 hass.states.async_set(test_entity, STATE_ON, {test_attribute: 25})223 hass.states.async_set(test_entity, STATE_ON, {test_attribute: 25})224 hass.states.async_set(test_entity, STATE_ON, {test_attribute: 25, "unrelated": 2})225 hass.states.async_set(test_entity, STATE_OFF, {test_attribute: 25})226 await knx.assert_telegram_count(1)...

Full Screen

Full Screen

test_utils.py

Source:test_utils.py Github

copy

Full Screen

...48 except ValueError:49 pass50 else:51 self.fail("Nonsense test name should throw ValueError")52 def test_test_address(self):53 # test addresses are specified as54 # package.module:class.method55 # /path/to/file.py:class.method56 # converted into 3-tuples (file, module, callable)57 # all terms optional58 test_address = util.test_address59 absfile = util.absfile60 class Foo:61 def bar(self):62 pass63 def baz():64 pass65 f = Foo()66 class FooTC(unittest.TestCase):67 def test_one(self):68 pass69 def test_two(self):70 pass71 class CustomTestType(type):72 pass73 class CustomTC(unittest.TestCase):74 __metaclass__ = CustomTestType75 def test_one(self):76 pass77 def test_two(self):78 pass79 foo_funct = case.FunctionTestCase(baz)80 foo_functu = unittest.FunctionTestCase(baz)81 foo_mtc = case.MethodTestCase(unbound_method(Foo, Foo.bar))82 me = util.src(absfile(__file__))83 self.assertEqual(test_address(baz),84 (me, __name__, 'baz'))85 assert test_address(Foo) == (me, __name__, 'Foo')86 assert test_address(unbound_method(Foo, Foo.bar)) == (me, __name__,87 'Foo.bar')88 assert test_address(f) == (me, __name__, 'Foo')89 assert test_address(f.bar) == (me, __name__, 'Foo.bar')90 assert test_address(nose) == (91 util.src(absfile(nose.__file__)), 'nose', None)92 # test passing the actual test callable, as the93 # missed test plugin must do94 self.assertEqual(test_address(FooTC('test_one')),95 (me, __name__, 'FooTC.test_one'))96 self.assertEqual(test_address(CustomTC('test_one')),97 (me, __name__, 'CustomTC.test_one'))98 self.assertEqual(test_address(foo_funct),99 (me, __name__, 'baz'))100 self.assertEqual(test_address(foo_functu),101 (me, __name__, 'baz'))102 self.assertEqual(test_address(foo_mtc),103 (me, __name__, 'Foo.bar'))104 # verify that we fail on an invalid input type105 self.assertRaises(TypeError, test_address, 1)106 self.assertRaises(TypeError, test_address, "foo")107 def test_isclass_detects_classes(self):108 class TC(unittest.TestCase):109 pass110 class TC_Classic:111 pass112 class TC_object(object):113 pass114 # issue153 -- was not detecting custom typed classes...115 class TCType(type):116 pass...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Nose automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful