How to use result method in devicefarmer-stf

Best JavaScript code snippet using devicefarmer-stf

request_handler.py

Source:request_handler.py Github

copy

Full Screen

1import hashlib2from server.async_database import *3from asyncio import StreamReader, StreamWriter4from server.async_database import *5import aiomysql6from server.data_buffer import DataBuffer7import random8import datetime9class RequestHandler:10 def __init__(self, data_buffer: DataBuffer):11 self.request = []12 self.data_buffer = data_buffer13 async def Request_Binding(self, request_number):14 result = None15 request_number = int(request_number)16 # 0번 요청은 사용하지 않는다.17 # 요청 넘버는 2의 배수로 정한다18 if request_number == 0:19 pass20 if request_number == 2:21 echo = EchoRequest(self.data_buffer)22 result = await echo.main()23 elif request_number == 4:24 select = LoginRequest(self.data_buffer)25 result = await select.main()26 elif request_number == 6:27 create = CheckRegisterIdRequest(self.data_buffer)28 result = await create.main()29 elif request_number == 8:30 insert = CreateNewAccount(self.data_buffer)31 result = await insert.main()32 elif request_number == 10:33 insert = GetCaptchaTestSet(self.data_buffer)34 result = await insert.main()35 elif request_number == 12:36 insert = GetLastDriveDate(self.data_buffer)37 result = await insert.main()38 elif request_number == 14:39 insert = GetCaptchaTestSet2(self.data_buffer)40 result = await insert.main()41 elif request_number == 16:42 insert = GetUserInfo(self.data_buffer)43 result = await insert.main()44 elif request_number == 18:45 insert = GetAllUserInfo(self.data_buffer)46 result = await insert.main()47 elif request_number == 20:48 insert = UpdateUserInfo(self.data_buffer)49 result = await insert.main()50 elif request_number == 22:51 insert = UpdateLastDriveDate(self.data_buffer)52 result = await insert.main()53 elif request_number == 24:54 insert = UpdateAlcoholCount(self.data_buffer)55 result = await insert.main()56 57 elif request_number == 26:58 insert = FindUserId(self.data_buffer)59 result = await insert.main()60 elif request_number == 28:61 insert = FindUserPw(self.data_buffer)62 result = await insert.main()63 return result64'''65현재 EchoRequest 로직에서만 66데이터 버퍼를 이용해 데이터를 처리 하도록함.67차후 생성하는 객체 모두에 다음과 같이 적용 시킬것68'''69# 0번 요청으로 사용하지 않는다70# 테이블 생성을 위한 요청71class CreateTableRequest(RequestHandler):72 def __init__(self, data_buffer: DataBuffer):73 super().__init__(data_buffer)74 async def main(self):75 temp = self.data_buffer.get_data()76 table_create_query = "CREATE TABLE person " \77 "( _id INT AUTO_INCREMENT, name VARCHAR(32) NOT NULL, " \78 "belong VARCHAR(12) DEFAULT 'FOO', " \79 "phone VARCHAR(12), PRIMARY KEY(_id) ) " \80 "ENGINE=INNODB"81 result = await query_operator(table_create_query)82 return result83# 2번 요청84# 에코 요청으로, 받은 데이터를 그대로 보내준다.85class EchoRequest(RequestHandler):86 def __init__(self, data_buffer: DataBuffer):87 super().__init__(data_buffer)88 async def main(self):89 try:90 result = self.data_buffer.get_data()91 if result:92 return result93 else:94 result = "false"95 return result96 except:97 result = "false"98 return result99# 4번 요청100# 로그인 요청101class LoginRequest(RequestHandler):102 def __init__(self, data_buffer: DataBuffer):103 super().__init__(data_buffer)104 async def main(self):105 temp = self.data_buffer.get_data()106 table_name = "member"107 login_query = "select * from" + " " + table_name108 try:109 result = await query_operator(login_query)110 id = None111 pw = None112 temp = temp.replace("[", "", 1)113 temp = temp.replace("]", "", 1)114 temp = temp.replace(" ", "")115 temp = temp.replace("'", "")116 temp = temp.split(',')117 for i in temp:118 i = i.split('=')119 if (i[0] == "user_id"):120 id = i[1]121 if (i[0] == "user_pw"):122 pw = i[1]123 db_id = None124 db_pw = None125 for i in result:126 db_id = i['id']127 db_pw = i['passwd']128 if (id == db_id):129 if (pw == db_pw):130 result = "true"131 return result132 else:133 result = "false"134 else:135 result = "false"136 return result137 except:138 result = "false"139 return result140# 6번 요청141# 회원가입시에 아이디 중복 검사를 위한 클래스142class CheckRegisterIdRequest(RequestHandler):143 def __init__(self, data_buffer: DataBuffer):144 super().__init__(data_buffer)145 async def main(self):146 temp = self.data_buffer.get_data()147 table_name = "member"148 get_all_id_query = "select id from" + " " + table_name149 try:150 result = await query_operator(get_all_id_query)151 id = None152 temp = temp.replace("[", "", 1)153 temp = temp.replace("]", "", 1)154 temp = temp.replace(" ", "")155 temp = temp.replace("'", "")156 temp = temp.split(',')157 for i in temp:158 i = i.split('=')159 if (i[0] == "user_id"):160 id = i[1]161 db_id = None162 for i in result:163 db_id = i['id']164 if (id == db_id):165 result = "true"166 return result167 else:168 result = "false"169 return result170 except:171 result = "false"172 return result173# 8번 요청174# 새로운 계정 생성 하는 객체175class CreateNewAccount(RequestHandler):176 def __init__(self, data_buffer: DataBuffer):177 super().__init__(data_buffer)178 async def main(self):179 temp = self.data_buffer.get_data()180 try:181 id = None182 passwd = None183 name = None184 tel = None185 temp = temp.replace("[", "", 1)186 temp = temp.replace("]", "", 1)187 temp = temp.replace(" ", "")188 temp = temp.replace("'", "")189 temp = temp.split(',')190 for i in temp:191 i = i.split('=')192 if (i[0] == "user_id"):193 id = i[1]194 if (i[0] == "user_pw"):195 passwd = i[1]196 if (i[0] == "user_name"):197 name = i[1]198 if (i[0] == "user_tel"):199 tel = i[1]200 current_time = (datetime.datetime.now()).strftime('%y-%m-%d')201 create_new_account_query = "INSERT INTO member (id, passwd, name, tel, last_drive_date) " \202 "VALUES('" + str(id) + "', '" + str(passwd) + "', '" + str(name) + "', '" + str(tel) + "', '" + current_time + "');"203 '''204 name = "asdfasdf"205 pw = "asdfadf"206 data_insert_query = "INSERT INTO person (userid, passwd) VALUES('" + str(name) + "', '" + str(pw) + "');"207 '''208 try:209 result = await test_example_execute(create_new_account_query)210 return result211 except:212 result = "false"213 return result214 result = "true"215 return result216 except:217 result = "false"218 return result219# 10번 요청220# 캡차 문제와 정답을 가져오는 클래스221class GetCaptchaTestSet(RequestHandler):222 def __init__(self, data_buffer: DataBuffer):223 super().__init__(data_buffer)224 async def main(self):225 temp = self.data_buffer.get_data()226 try:227 # 캡챠 문제는 늘릴것 현재는 3번 문제까지 있음228 answer = ""229 test_set_num = random.randrange(1, 100)230 test_set_num %= 9231 # 디비는 1번 부터 시작하므로, +1을 해줘야함232 test_set_num += 1233 check_captcha_answer_query = "SELECT captcha_answer FROM captcha where captcha_num=" + "'" + str(test_set_num) + "';"234 try:235 result = await query_operator(check_captcha_answer_query)236 result = result[0]237 result = result.get('captcha_answer')238 return result239 except:240 result = "false"241 return result242 return result243 except:244 result = "false"245 return result246# 12번 요청247# 마지막 운전 날짜를 가져오는 클래스248class GetLastDriveDate(RequestHandler):249 def __init__(self, data_buffer: DataBuffer):250 super().__init__(data_buffer)251 async def main(self):252 temp = self.data_buffer.get_data()253 user_id=None254 temp = temp.replace("[", "", 1)255 temp = temp.replace("]", "", 1)256 temp = temp.replace(" ", "")257 temp = temp.replace("'", "")258 temp = temp.split(',')259 for i in temp:260 i = i.split('=')261 if (i[0] == "user_id"):262 user_id = i[1]263 try:264 query = "SELECT last_drive_date FROM member where id="+"'"+user_id + "';"265 try:266 result = await query_operator(query)267 result = result[0]268 result = result.get('last_drive_date')269 # result = result.split(" ")270 # temp1 = result[0]271 temp1 = str(result)272 # temp2 = result[1]273 temp1 = temp1.split("-")274 # temp2 = temp2.split(":")275 ''' 마지막 접속 날짜 '''276 year = int(temp1[0])277 month = int(temp1[1])278 day = int(temp1[2])279 ''' 현재 날짜 '''280 now = datetime.datetime.today()281 now = str(now).split()282 now = str(now[0]).split("-")283 now_year = int(now[0])284 now_month = int(now[1])285 now_day = int(now[2])286 ''' 날짜 연산 시작 '''287 last_access_date = datetime.date(year, month, day)288 now_date = datetime.date(now_year, now_month, now_day)289 delta = now_date - last_access_date290 result = str(delta.days)291 return result292 except:293 result = "false"294 return result295 except:296 result = "false"297 return result298# 14번 요청299# 캡차 문제와 정답을 가져오는 클래스300class GetCaptchaTestSet2(RequestHandler):301 def __init__(self, data_buffer: DataBuffer):302 super().__init__(data_buffer)303 async def main(self):304 temp = self.data_buffer.get_data()305 try:306 captcha2_number = None307 captcha2_answer = None308 temp = temp.replace("[", "", 1)309 temp = temp.replace("]", "", 1)310 temp = temp.replace(" ", "")311 temp = temp.replace("'", "")312 temp = temp.split(',')313 for i in temp:314 i = i.split('=')315 if (i[0] == "captcha2_number"):316 captcha2_number = i[1]317 if (i[0] == "captcha2_answer"):318 captcha2_answer = i[1]319 # check_captcha_answer_query = "SELECT captcha_answer FROM captcha2 where captcha_num=captcha2_number ;"320 check_captcha_answer_query = "SELECT captcha_answer FROM captcha2 where captcha_num="+"'"+captcha2_number +"';"321 try:322 result = await query_operator(check_captcha_answer_query)323 result = result[0]324 result = result.get('captcha_answer')325 # 대문자로 입력했을 경우 소문자로 바꿔준다.326 captcha2_answer = captcha2_answer.lower()327 if captcha2_answer == result:328 result = "true"329 return result330 else:331 result = "false"332 return result333 except:334 result = "false"335 return result336 except:337 result = "false"338 return result339# 16번 요청340# 운전 정보를 넘겨주는 클래스341class GetUserInfo(RequestHandler):342 def __init__(self, data_buffer: DataBuffer):343 super().__init__(data_buffer)344 async def main(self):345 temp = self.data_buffer.get_data()346 try:347 userid = None348 temp = temp.replace("[", "", 1)349 temp = temp.replace("]", "", 1)350 temp = temp.replace(" ", "")351 temp = temp.replace("'", "")352 temp = temp.split(',')353 for i in temp:354 i = i.split('=')355 if (i[0] == "userid"):356 userid = i[1]357 get_userinfo_query="SELECT * FROM MEMBER WHERE id="+"'"+ userid +"';"358 try:359 qeury_result = await query_operator(get_userinfo_query)360 qeury_result = qeury_result[0]361 result = ""362 # 드라이브 횟수, 졸음운전 감지 횟수, 음주운전 감지 횟수, 마지막 운전 날짜363 result += str(str(qeury_result.get('drive_cnt'))+"-")364 result += str(str(qeury_result.get('sleep_detect_cnt'))+"-")365 result += str(str(qeury_result.get('alcohol_detect_cnt'))+"-")366 last_drive_date = str(str(qeury_result.get('last_drive_date')))367 last_drive_date = last_drive_date.split(" ")368 last_drive_date = last_drive_date[0]369 result += last_drive_date.replace("-", "/")370 return result371 except:372 result = "false"373 return result374 except:375 result = "false"376 return result377# 18번 요청378# 사용자의 모든 정보를 넘겨주는 클래스379class GetAllUserInfo(RequestHandler):380 def __init__(self, data_buffer: DataBuffer):381 super().__init__(data_buffer)382 async def main(self):383 temp = self.data_buffer.get_data()384 try:385 userid = None386 temp = temp.replace("[", "", 1)387 temp = temp.replace("]", "", 1)388 temp = temp.replace(" ", "")389 temp = temp.replace("'", "")390 temp = temp.split(',')391 for i in temp:392 i = i.split('=')393 if (i[0] == "userid"):394 userid = i[1]395 get_all_userinfo_query="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"396 try:397 qeury_result = await query_operator(get_all_userinfo_query)398 qeury_result = qeury_result[0]399 result = ""400 # 드라이브 횟수, 졸음운전 감지 횟수, 음주운전 감지 횟수, 마지막 운전 날짜401 result += str(str(qeury_result.get('id'))+"-")402 result += str(str(qeury_result.get('passwd'))+"-")403 result += str(str(qeury_result.get('name'))+"-")404 result += str(str(qeury_result.get('tel'))+"-")405 return result406 except:407 result = "false"408 return result409 except:410 result = "false"411 return result412# 20번 요청413# 사용자 정보를 업데이트 하는 클래스414class UpdateUserInfo(RequestHandler):415 def __init__(self, data_buffer: DataBuffer):416 super().__init__(data_buffer)417 async def main(self):418 temp = self.data_buffer.get_data()419 try:420 userid = None421 userpassword = None422 username = None423 usertel = None424 temp = temp.replace("[", "", 1)425 temp = temp.replace("]", "", 1)426 temp = temp.replace(" ", "")427 temp = temp.replace("'", "")428 temp = temp.split(',')429 for i in temp:430 i = i.split('=')431 if (i[0] == "userid"):432 userid = i[1]433 if (i[0] == "userpassword"):434 userpassword = i[1]435 if (i[0] == "username"):436 username = i[1]437 if (i[0] == "usertel"):438 usertel = i[1]439 get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"440 primary_key = None441 try:442 primary_key = await query_operator(get_primary_key)443 primary_key = primary_key[0]444 primary_key = int(primary_key.get('index'))445 update_userinfo_query = "UPDATE member SET `passwd` = "+"'" + str(userpassword) + "'" + " , `name` = "+"'" + str(username) + "'" + " , `tel` = " + "'" + str(usertel) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"446 qeury_result = await update_execute(update_userinfo_query)447 if qeury_result:448 result = "true"449 return result450 else:451 result = "false"452 return result453 except:454 result = "false"455 return result456 except:457 result = "false"458 return result459# 22번 요청460# 마지막 운전 날짜를 업데이트 하는 함수461class UpdateLastDriveDate(RequestHandler):462 def __init__(self, data_buffer: DataBuffer):463 super().__init__(data_buffer)464 async def main(self):465 temp = self.data_buffer.get_data()466 try:467 userid = None468 temp = temp.replace("[", "", 1)469 temp = temp.replace("]", "", 1)470 temp = temp.replace(" ", "")471 temp = temp.replace("'", "")472 temp = temp.split(',')473 for i in temp:474 i = i.split('=')475 if (i[0] == "userid"):476 userid = i[1]477 get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"478 primary_key = None479 try:480 primary_key = await query_operator(get_primary_key)481 temp_result = primary_key482 primary_key = primary_key[0]483 primary_key = int(primary_key.get('index'))484 temp_result = temp_result[0]485 drive_cnt = int(temp_result.get('drive_cnt'))486 drive_cnt += 1487 new_last_drive_date = (datetime.datetime.now()).strftime('%y-%m-%d')488 update_userinfo_query = "UPDATE member SET `drive_cnt` = "+"'" + str(drive_cnt) + "'" + " , `last_drive_date` = "+"'" + str(new_last_drive_date) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"489 qeury_result = await update_execute(update_userinfo_query)490 if qeury_result:491 result = "true"492 return result493 else:494 result = "false"495 return result496 except:497 result = "false"498 return result499 except:500 result = "false"501 return result502# 24번 요청503# 테스트 실패시에 음주운전 횟수 1회 증가 시킴504class UpdateAlcoholCount(RequestHandler):505 def __init__(self, data_buffer: DataBuffer):506 super().__init__(data_buffer)507 async def main(self):508 temp = self.data_buffer.get_data()509 try:510 userid = None511 temp = temp.replace("[", "", 1)512 temp = temp.replace("]", "", 1)513 temp = temp.replace(" ", "")514 temp = temp.replace("'", "")515 temp = temp.split(',')516 for i in temp:517 i = i.split('=')518 if (i[0] == "userid"):519 userid = i[1]520 get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"521 primary_key = None522 try:523 primary_key = await query_operator(get_primary_key)524 temp_result = primary_key525 primary_key = primary_key[0]526 primary_key = int(primary_key.get('index'))527 temp_result = temp_result[0]528 alcohol_detect_count = int(temp_result.get('alcohol_detect_cnt'))529 alcohol_detect_count += 1530 # update_userinfo_query = "UPDATE member SET `alcohol_detect_cnt` = "+"'" + str(alcohol_detect_count) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"531 update_userinfo_query = "UPDATE `test`.`member` SET `alcohol_detect_cnt` = '" + str(alcohol_detect_count) + "' WHERE(`index` = '" + str(primary_key) + "');"532 qeury_result = await update_execute(update_userinfo_query)533 if qeury_result:534 result = "true"535 return result536 else:537 result = "false"538 return result539 except:540 result = "false"541 return result542 except:543 result = "false"544 return result545# 26번 요청546# 사용자의 아이디를 찾아 반환함547class FindUserId(RequestHandler):548 def __init__(self, data_buffer: DataBuffer):549 super().__init__(data_buffer)550 async def main(self):551 temp = self.data_buffer.get_data()552 try:553 username = None554 usertel = None555 temp = temp.replace("[", "", 1)556 temp = temp.replace("]", "", 1)557 temp = temp.replace(" ", "")558 temp = temp.replace("'", "")559 temp = temp.split(',')560 for i in temp:561 i = i.split('=')562 if (i[0] == "username"):563 username = i[1]564 if (i[0] == "usertel"):565 usertel = i[1]566 get_all_userinfo_query = "SELECT * FROM MEMBER WHERE name=" + "'" + username + "'" + " AND tel=" + "'" + usertel + "'" + "; "567 try:568 qeury_result = await query_operator(get_all_userinfo_query)569 qeury_result = qeury_result[0]570 result = ""571 # 보낸 이름과 전화번호가 저장된 이름과 전화번호화 일치하는지 점검572 if username == str(qeury_result.get('name')) and usertel == str(qeury_result.get('tel')):573 result += str(qeury_result.get('id'))574 else:575 result += "false"576 return result577 except:578 result = "false"579 return result580 except:581 result = "false"582 return result583# 28번 요청584# 사용자의 비밀번호를 찾아 반환함585class FindUserPw(RequestHandler):586 def __init__(self, data_buffer: DataBuffer):587 super().__init__(data_buffer)588 async def main(self):589 temp = self.data_buffer.get_data()590 try:591 userid = None592 username = None593 temp = temp.replace("[", "", 1)594 temp = temp.replace("]", "", 1)595 temp = temp.replace(" ", "")596 temp = temp.replace("'", "")597 temp = temp.split(',')598 for i in temp:599 i = i.split('=')600 if (i[0] == "userid"):601 userid = i[1]602 if (i[0] == "username"):603 username = i[1]604 get_all_userinfo_query = "SELECT * FROM MEMBER WHERE name=" + "'" + username + "'" + " AND id=" + "'" + userid + "'" + "; "605 try:606 qeury_result = await query_operator(get_all_userinfo_query)607 qeury_result = qeury_result[0]608 result = ""609 # 보낸 이름과 전화번호가 저장된 이름과 전화번호화 일치하는지 점검610 if userid == str(qeury_result.get('id')) and username == str(qeury_result.get('name')):611 result += str(qeury_result.get('passwd'))612 else:613 result += "false"614 return result615 except:616 result = "false"617 return result618 except:619 result = "false"...

Full Screen

Full Screen

test_client.py

Source:test_client.py Github

copy

Full Screen

1#!/usr/bin/python2##3## Copyright 2008, Various4## Adrian Likins <alikins@redhat.com>5##6## This software may be freely redistributed under the terms of the GNU7## general public license.8##9import os10import socket11import unittest12import xmlrpclib13import func.overlord.client as fc14import func.utils15import socket16class BaseTest:17 # assume we are talking to localhost18 th = socket.gethostname()19 nforks=120 async=False21 def __init__(self):22 pass23 def setUp(self):24 self.overlord = fc.Overlord(self.th,25 nforks=self.nforks,26 async=self.async)27 def test_module_version(self):28 mod = getattr(self.overlord, self.module)29 result = mod.module_version()30 self.assert_on_fault(result)31 def test_module_api_version(self):32 mod = getattr(self.overlord, self.module)33 result = mod.module_api_version()34 self.assert_on_fault(result)35 def test_module_description(self):36 mod = getattr(self.overlord, self.module)37 result = mod.module_description()38 self.assert_on_fault(result)39 def test_module_list_methods(self):40 mod = getattr(self.overlord, self.module)41 result = mod.list_methods()42 self.assert_on_fault(result)43 def test_module_get_method_args(self):44 mod = getattr(self.overlord,self.module)45 arg_result=mod.get_method_args()46 self.assert_on_fault(arg_result)47 def test_module_inventory(self):48 mod = getattr(self.overlord, self.module)49 result = mod.list_methods()50 res = result[self.th]51 # not all modules have an inventory, so check for it52 # FIXME: not real happy about having to call list method for53 # every module, but it works for now -akl54 if "inventory" in res:55 result = mod.inventory()56 self.assert_on_fault(result)57 # we do this all over the place...58 def assert_on_fault(self, result):59 assert func.utils.is_error(result[self.th]) == False60# assert type(result[self.th]) != xmlrpclib.Fault61 def assert_on_no_fault(self, results):62 assert func.utils.is_error(results[self.th]) == True63 # attrs set so we can skip these via nosetest64 test_module_version.intro = True65 test_module_api_version.intro = True66 test_module_description.intro = True67 test_module_list_methods.intro = True68 test_module_inventory.intro = True69 test_module_get_method_args.intro = True70class TestTest(BaseTest):71 module = "test"72 def test_add(self):73 result = self.overlord.test.add(1,5)74 self.assert_on_fault(result)75 assert result[self.th] == 676 def test_add_string(self):77 result = self.overlord.test.add("foo", "bar")78 self.assert_on_fault(result)79 assert result[self.th] == "foobar"80 def test_sleep(self):81 result = self.overlord.test.sleep(1)82 self.assert_on_fault(result)83 def test_explode(self):84 results = self.overlord.test.explode()85 print results86 self.assert_on_no_fault(results)87 def test_explode_no_string(self):88 results = self.overlord.test.explode_no_string()89 print results90 self.assert_on_no_fault(results)91 def _echo_test(self, data):92 result = self.overlord.test.echo(data)93 self.assert_on_fault(result)94 assert result[self.th] == data95 # this tests are basically just to test the basic96 # marshalling/demarshaling bits97 def test_echo_int(self):98 self._echo_test(1)99 def test_echo_string(self):100 self._echo_test("caneatcheese")101 def test_echo_array(self):102 self._echo_test([1, 2, "three", "fore", "V",])103 def test_echo_hash(self):104 self._echo_test({"one":1, "too":2, "III":3, "many":8, "lots":12312})105 def test_echo_bool_false(self):106 self._echo_test(False)107 def test_echo_bool_true(self):108 self._echo_test(True)109 def test_echo_float(self):110 self._echo_test(123.456)111 def test_echo_big_float(self):112 self._echo_test(123121232.23)113 def test_echo_bigger_float(self):114 self._echo_test(234234234234234234234.234234234234234)115 def test_echo_little_float(self):116 self._echo_test(0.000000000000000000000000000000037)117 def test_echo_binary(self):118 blob = "348dshke354ts0d9urgk"119 import xmlrpclib120 data = xmlrpclib.Binary(blob)121 self._echo_test(data)122 def test_echo_date(self):123 import datetime124 dt = datetime.datetime(1974, 1, 5, 11, 59 ,0,0, None)125 import xmlrpclib126 data = xmlrpclib.DateTime(dt)127 self._echo_test(data)128 def test_config(self):129 result = self.overlord.test.configfoo()130 config = result[self.th]131 self.assert_on_fault(result)132 def test_config_write(self):133 result = self.overlord.test.config_save()134 config = result[self.th]135 self.assert_on_fault(result)136 def test_config_set(self):137 result = self.overlord.test.config_set("example", 5000)138 config = result[self.th]139 self.assert_on_fault(result)140 def test_config_get_example(self):141 result = self.overlord.test.config_get("example")142 config = result[self.th]143 self.assert_on_fault(result)144 def test_config_get_string(self):145 result = self.overlord.test.config_get("string_option")146 option = result[self.th]147 print option148 assert(type(option) == type("foo"))149 self.assert_on_fault(result)150 def test_config_get_int(self):151 result = self.overlord.test.config_get("int_option")152 option = result[self.th]153 assert(type(option) == type(1))154 self.assert_on_fault(result)155 def test_config_get_bool(self):156 result = self.overlord.test.config_get("bool_option")157 option = result[self.th]158 assert(type(option) == type(True))159 self.assert_on_fault(result)160 def test_config_get_float(self):161 result = self.overlord.test.config_get("float_option")162 option = result[self.th]163 assert(type(option) == type(2.71828183))164 self.assert_on_fault(result)165 def test_config_get_test(self):166 result = self.overlord.test.config_get_test()167 self.assert_on_fault(result)168class TestCommand(BaseTest):169 module = "command"170 def test_echo(self):171 result = self.overlord.command.run("echo -n foo")172 self.assert_on_fault(result)173 assert result[self.th][1] == "foo"174 def test_rpm(self):175 # looksing for some package that should be there, rh specific176 # ish at the moment177 result = self.overlord.command.run("rpm -q filesystem")178 self.assert_on_fault(result)179 assert result[self.th][1].split("-")[0] == "filesystem"180 def test_env(self):181 result = self.overlord.command.run("env",182 {'BLIPPYFOO':'awesome'})183 self.assert_on_fault(result)184 assert result[self.th][1].find("BLIPPYFOO=awesome") != -1185# def test_sleep_long(self):186# result = self.overlord.command.run("sleep 256")187# self.assert_on_fault(result)188 def test_shell_globs(self):189 result = self.overlord.command.run("ls /etc/cron*")190 self.assert_on_fault(result)191 def test_shell_pipe(self):192 result = self.overlord.command.run("echo 'foobar' | grep foo")193 self.assert_on_fault(result)194 assert result[self.th][1] == "foobar\n"195 def test_shell_redirect(self):196 result = self.overlord.command.run("mktemp")197 tmpfile = result[self.th][1].strip()198 result = self.overlord.command.run("echo foobar >> %s; cat %s" % (tmpfile, tmpfile))199 self.assert_on_fault(result)200 assert result[self.th][1] == "foobar\n"201 def test_shell_multiple_commands(self):202 result = self.overlord.command.run("cal; date; uptime; ls;")203 self.assert_on_fault(result)204class TestCopyfile(BaseTest):205 fn = "/tmp/func_test_file"206 dest_fn = "/tmp/func_test_file_dest"207 content = "this is a func test file"208 module = "copyfile"209 def create_a_file(self, size=1):210 f = open(self.fn, "w")211 f.write(self.content*size)212 f.close()213# def test_local_copyfile(self):214# result = self.overlord.local.copyfile.send(self.fn, self.dest_fn)215# print result216# self.assert_on_fault(result)217 def test_copyfile(self, size=1):218 self.create_a_file(size=size)219 fb = open(self.fn,"r").read()220 data = xmlrpclib.Binary(fb)221 result = self.overlord.copyfile.copyfile(self.dest_fn, data)222 self.assert_on_fault(result)223 assert result[self.th] == 0224 # def test_copyfile_big(self):225 # # make a file in the ~70 meg range226 # self.test_copyfile(size=100)227 def test_checksum(self):228 self.create_a_file()229 fb = open(self.fn,"r").read()230 data = xmlrpclib.Binary(fb)231 result = self.overlord.copyfile.copyfile(self.dest_fn, data)232 result = self.overlord.copyfile.checksum(self.dest_fn)233 self.assert_on_fault(result)234 assert result[self.th] == "b36a8040e44c16605d7784cdf1b3d9ed04ea7f55"235class TestHardware(BaseTest):236 module = "hardware"237 def test_inventory(self):238 result = self.overlord.hardware.inventory()239 self.assert_on_fault(result)240 def test_halinfo(self):241 result = self.overlord.hardware.hal_info()242 self.assert_on_fault(result)243 def test_info(self):244 result = self.overlord.hardware.info()245 self.assert_on_fault(result)246 def test_info_no_devices(self):247 result = self.overlord.hardware.info(False)248 self.assert_on_fault(result)249class TestFileTracker(BaseTest):250 fn = "/etc/hosts"251 fn_glob = "/etc/init.d/*"252 fn_list = ["/etc/hosts", "/etc/func/minion.conf"]253 fn_glob_list = ["/etc/hosts", "/etc/func/*"]254 module = "filetracker"255 def test_track(self):256 result = self.overlord.filetracker.track(self.fn)257 assert result[self.th] == 1258 self.assert_on_fault(result)259 def test_track_glob(self):260 result = self.overlord.filetracker.track(self.fn)261 assert result[self.th] == 1262 self.assert_on_fault(result)263 def test_untrack_glob(self):264 result = self.overlord.filetracker.track(self.fn_glob)265 result = self.overlord.filetracker.untrack(self.fn_glob)266 self.assert_on_fault(result)267 def test_track_fn_list(self):268 result = self.overlord.filetracker.track(self.fn_list)269 assert result[self.th] == 1270 self.assert_on_fault(result)271 def test_untrack_fn_list(self):272 result = self.overlord.filetracker.track(self.fn_list)273 result = self.overlord.filetracker.untrack(self.fn_list)274 self.assert_on_fault(result)275 def test_track_fn_glob_list(self):276 result = self.overlord.filetracker.track(self.fn_glob_list)277 assert result[self.th] == 1278 self.assert_on_fault(result)279 def test_untrack_fn_glob_list(self):280 result = self.overlord.filetracker.track(self.fn_glob_list)281 result = self.overlord.filetracker.untrack(self.fn_glob_list)282 self.assert_on_fault(result)283 def test_inventory(self):284 result = self.overlord.filetracker.track(self.fn)285 result = self.overlord.filetracker.inventory(False)286 self.assert_on_fault(result)287 assert self.fn in result[self.th][0]288# assert result[self.th][0][3] == 0289 def test_untrack(self):290 result = self.overlord.filetracker.track(self.fn)291 result = self.overlord.filetracker.untrack(self.fn)292 self.assert_on_fault(result)293 result_inv = self.overlord.filetracker.inventory(False)294 tracked_files = result_inv[self.th]295 for i in tracked_files:296 if i[0] == self.fn:297 assert "%s was not properly untracked" % self.fn298class TestMount(BaseTest):299 module = "mount"300 def test_mount_list(self):301 result = self.overlord.mount.list()302 self.assert_on_fault(result)303 # INSERT some clever way to test mount here304class TestNetworkTest(BaseTest):305 module = "networktest"306 def test_ping(self):307 result = self.overlord.networktest.ping(self.th, "-c", "2")308 self.assert_on_fault(result)309 def test_ping_bad_arg(self):310 result = self.overlord.networktest.ping(self.th)311 # this should give us a FuncException312 foo = func.utils.is_error(result[self.th])313 def test_netstat(self):314 result = self.overlord.networktest.netstat("-n")315 self.assert_on_fault(result)316 def test_traceroute(self):317 result = self.overlord.networktest.traceroute(self.th)318 self.assert_on_fault(result)319 def test_dig(self):320 result = self.overlord.networktest.dig("redhat.com")321 self.assert_on_fault(result)322 def test_isportopen_closed_port(self):323 result = self.overlord.networktest.isportopen(self.th, 34251)324 self.assert_on_fault(result)325 def test_isportopen_open_port(self):326 result = self.overlord.networktest.isportopen(self.th, 51234)327 self.assert_on_fault(result)328class TestProcess(BaseTest):329 module = "process"330 def test_info(self):331 result = self.overlord.process.info()332 self.assert_on_fault(result)333 def test_mem(self):334 result = self.overlord.process.mem()335 self.assert_on_fault(result)336 # FIXME: how to test kill/pkill? start a process with337 # command and then kill it?338class TestService(BaseTest):339 module = "service"340 def test_inventory(self):341 result = self.overlord.service.inventory()342 self.assert_on_fault(result)343 def test_get_enabled(self):344 result = self.overlord.service.get_enabled()345 self.assert_on_fault(result)346 def test_get_running(self):347 result = self.overlord.service.get_running()348 self.assert_on_fault(result)349 def test_get_status(self):350 running_data = self.overlord.service.get_running()[self.th]351 result = self.overlord.service.status(running_data[0][0])352 self.assert_on_fault(result)353 #FIXME: whats a good way to test starting/stoping services without354 # doing bad things? -akl355class TestRpm(BaseTest):356 module = "rpms"357 def test_inventory(self):358 result = self.overlord.rpms.inventory()359 self.assert_on_fault(result)360 def test_glob(self):361 # if func is running, there should at least be python installed ;->362 result = self.overlord.rpms.glob("python*", False)363 self.assert_on_fault(result)364 def test_glob_flatten(self):365 result = self.overlord.rpms.glob("python*", True)366 self.assert_on_fault(result)367 def test_glob_nomatch(self):368 # shouldn't be any rpms called "-" ;->369 result = self.overlord.rpms.glob("-*")370 self.assert_on_fault(result)371 def test_glob_gpg_pubkey(self):372 # gpg-pubkey packages are weird rpm virtual packages, and tend to do373 # weird things, so try that too374 result = self.overlord.rpms.glob("gpg-pubkey*")375 self.assert_on_fault(result)376 def test_glob_gpg_pubkey_no_flat(self):377 # gpg-pubkey packages are weird rpm virtual packages, and tend to do378 # weird things, so try that too379 result = self.overlord.rpms.glob("gpg-pubkey*", False)380 self.assert_on_fault(result)381 def test_glob_match_all(self):382 result = self.overlord.rpms.glob("*", False)383 self.assert_on_fault(result)384 def test_verify(self):385 result = self.overlord.rpms.verify("bash")386 self.assert_on_fault(result)387class TestSmart(BaseTest):388 module = "smart"389 def test_info(self):390 result = self.overlord.smart.info()391 self.assert_on_fault(result)392class TestSysctl(BaseTest):393 module = "sysctl"394 def test_list(self):395 result = self.overlord.sysctl.list()396 self.assert_on_fault(result)397 def test_get(self):398 result = self.overlord.sysctl.get("kernel.max_lock_depth")399 self.assert_on_fault(result)400class TestYum(BaseTest):401 module = "yumcmd"402 def test_check_update(self):403 result = self.overlord.yumcmd.check_update()404 self.assert_on_fault(result)405 def test_check_update_empty_filter(self):406 results = self.overlord.yumcmd.check_update([])407 self.assert_on_fault(results)408 results_no_filter = self.overlord.yumcmd.check_update()409 assert results == results_no_filter410 def test_check_update_splat_filter(self):411 results = self.overlord.yumcmd.check_update(['*'])412 self.assert_on_fault(results)413 results_no_filter = self.overlord.yumcmd.check_update()414 assert results == results_no_filter415# this fails on fc6, need to test on newer yum to see whats up416# def test_update_non_existent_package(self):417# result = self.overlord.yumcmd.update("thisisapackage-_-that_should==never+exist234234234")418# self.assert_on_fault(result)419# # hmm, that method always returns True... not much to test there... -akl420class TestIptables(BaseTest):421 module = "iptables"422 def test_dump(self):423 result = self.overlord.iptables.dump()424 # at the moment, we dont set anything up425 # to verify, so this is just a basic426 # "does it crash" test427 def test_policy(self):428 result = self.overlord.iptables.policy()429class TestIptablesPort(BaseTest):430 module = "iptables.port"431 def test_inventory(self):432 results = self.overlord.iptables.port.inventory()433 # doesnt have an inventory, so er... -akl434#class TestHttpd(BaseTest):435# module = "httpd"436# def test_server_status(self):437# result = self.overlord.httpd.server_status()438# self.assert_on_fault(result)439#440# def test_graceful(self):441# result = self.overlord.httpd.graceful()442# self.assert_on_fault(result)443class TestEchoTest(BaseTest):444 module = "echo"445 def test_run_string(self):446 result=self.overlord.echo.run_string("heyman")447 self.assert_on_fault(result)448 def test_run_int(self):449 result=self.overlord.echo.run_int(12)450 self.assert_on_fault(result)451 def test_run_float(self):452 result=self.overlord.echo.run_float(12.0)453 self.assert_on_fault(result)454 def test_run_options(self):455 result=self.overlord.echo.run_options("hehehh")456 self.assert_on_fault(result)457 def test_run_list(self):458 result=self.overlord.echo.run_list(['one','two','three'])459 self.assert_on_fault(result)460 def test_run_hash(self):461 result=self.overlord.echo.run_hash({'one':1,'two':2})462 self.assert_on_fault(result)463 def test_run_boolean(self):464 result=self.overlord.echo.run_hash(True)465 self.assert_on_fault(result)466class TestFactsModule(BaseTest):467 module = "fact"468 def test_list_fact_modules(self):469 result=self.overlord.fact.list_fact_modules()470 print result471 self.assert_on_fault(result)472 def test_list_fact_methods(self):473 # we want to catch the case if haveconflicts ...474 result=self.overlord.fact.list_fact_methods(True)475 print result476 self.assert_on_fault(result)477 if type(result[self.th])==dict and result[self.th].has_key('__conflict__'):478 raise Exception("You have conflict in tags run fact.list_fact_methods for more info : %s"%result)479 def test_show_fact_module(self):480 modules = self.overlord.fact.list_fact_modules().values()481 print "Modules to run for show fact module are ",modules482 for module in modules[0]:483 result = self.overlord.fact.show_fact_module(module)484 print result485 self.assert_on_fault(result)486 def test_show_fact_method(self):487 methods = self.overlord.fact.list_fact_methods().values()488 print "Methods to run for show fact method are ",methods489 for method in methods[0]:490 result = self.overlord.fact.show_fact_method(method)491 print result492 self.assert_on_fault(result)493 def test_fact_call(self):494 methods = self.overlord.fact.list_fact_methods().values()495 for method in methods[0]:496 result = self.overlord.fact.call_fact(method)497 print result498 self.assert_on_fault(result)499class TestSystem(BaseTest):500 module = "system"501 def test_list_methods(self):502 result = self.overlord.system.list_methods()503 self.assert_on_fault(result)504 def test_listMethods(self):505 result = self.overlord.system.listMethods()506 self.assert_on_fault(result)507 def test_list_modules(self):508 result = self.overlord.system.list_modules()509 self.assert_on_fault(result)510 #FIXME: we really should just implement these for the system stuff511 # as well512 def test_module_version(self):513 pass514 def test_module_api_version(self):515 pass516 def test_module_description(self):517 pass518 def test_module_get_method_args(self):519 pass520#import time521#class TestAsyncTest(BaseTest):522# module = "async.test"523# nforks=1524# async=True525# def test_sleep_async(self):526# job_id = self.overlord.test.sleep(5)527# print "job_id", job_id528# time.sleep(5)529# (return_code, results) = self.overlord.job_status(job_id)530# print "return_code", return_code531# print "results", results532#533# def test_add_async(self):534# job_id = self.overlord.test.add(1,5)535# print "job_id", job_id536# time.sleep(6)537# (return_code, results) = self.overlord.job_status(job_id)538# print "return_code", return_code...

Full Screen

Full Screen

Tilte_minmax.py

Source:Tilte_minmax.py Github

copy

Full Screen

1from vic import *2import requests3import threading4import time5import json67lock = threading.Lock()8Gas_dict={}9Gas_param = {}10Gas_return = {}11121314def mainLoop(info, arr):15 16 ID_string = str(PAGE01.OCR337).replace(' ', '')17 char_list = split(ID_string)18 colon_list = []19 slash_list = []20 E_ID = ''21 U_ID = ''22 print(char_list)23 24 for i in range(len(char_list)):25 if char_list[i] == ':':26 colon_list.append(i)27 if char_list[i] == '/':28 slash_list.append(i)29 30 for i in range(colon_list[0] + 1, slash_list[0]):31 E_ID += char_list[i]3233 print(E_ID)34 35 print(colon_list)36 print(slash_list)37 38 39 pass4041def split(word):42 return[char for char in word]4344def SetGasParam(**kwargs):45 46 global Gas_param, lock47 48 Gas_param_temp = {}49 50 try:51 E_ID = kwargs['E_ID']52 Gas_param_temp['E_ID'] = E_ID53 except:54 return {'error_message':'Cannot find E_ID.', 'is_success':'N'}55 56 try:57 U_ID = kwargs['U_ID']58 Gas_param_temp['U_ID'] = U_ID59 except:60 return {'error_message':'Cannot find U_ID.', 'is_success':'N'}61 62 try:63 O_ID = kwargs['O_ID']64 Gas_param_temp['O_ID'] = O_ID65 except:66 return {'error_message':'Cannot find O_ID.', 'is_success':'N'}67 68 try:69 F_ID = kwargs['F_ID']70 Gas_param_temp['F_ID'] = F_ID71 except:72 return {'error_message':'Cannot find F_ID.', 'is_success':'N'}73 74 try:75 Gas_set = kwargs['Gas_set']76 Gas_param_temp['Gas_set'] = Gas_set77 except:78 return {'error_message':'Cannot find Gas_set.', 'is_success':'N'}79 80 81 lock.acquire()82 83 Gas_param[F_ID] = json.dumps(Gas_param_temp)84 85 lock.release()86 87 #print(Gas_param)88 89 return {'is_success':'Y'}90 9192 93def RecognizeImage(**args):94 95 global Gas_dict, Gas_param, Gas_return, lock9697 try:98 img = args['img']99 except:100 return {'error_message':'Cannot find img.', 'is_success':'N'}101 102 Gas_dict_temp = {}103 Gas_return_temp = {}104 find = 0105 result = {}106 107 E_ID = ''108 U_ID = ''109 F_ID = ''110 ID = ''111 ID_temp = ''112 ID_string = ''113 char_list = []114 colon_list = []115 slash_list = []116 117 '''118 x1 = int(result['PATTERN01'].X)+int(result['PATTERN01'].WIDTH)119 y1 = int(result['PATTERN01'].Y)120 w1 = int(result['PATTERN02'].X)-x1121 h1 = int(result['PATTERN01'].HEIGHT)122 x2 = int(result['PATTERN03'].X)+int(result['PATTERN03'].WIDTH)123 y2 = int(result['PATTERN03'].Y)124 w2 = 100125 h2 = int(result['PATTERN03'].HEIGHT)126 E_ID = TOOL.OCR(0, 'epmm_ID', x1, y1, w1, h1, img, {'segmentation':7, 'preprocess_resize':7, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 0, 'preprocess_threshold_value': 135})127 U_ID = TOOL.OCR(0, 'epmm_ID', x2, y2, w2, h2, img, {'segmentation':7, 'preprocess_resize':7, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 0, 'preprocess_threshold_value': 135}) 128 E_ID = str(E_ID).replace(' ', '')129 U_ID = str(U_ID).replace(' ', '')130 '''131 132 '''133 E_ID = args['E_ID']134 U_ID = args['U_ID']135 '''136 137 ID_string = TOOL.OCR(0, 'ID_test', 127,0,1393,22, img, {'segmentation':7, 'preprocess_resize':5, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 1, 'preprocess_threshold_value': 135})138 ID_string = ID_string.replace(' ', '')139 char_list = split(ID_string)140 141 for i in range(len(char_list)):142 if char_list[i] == ':':143 colon_list.append(i)144 if char_list[i] == '/':145 slash_list.append(i)146 147 for i in range(colon_list[0] + 1, slash_list[0]):148 E_ID += char_list[i]149 150 for i in range(colon_list[3] + 1, len(char_list)):151 U_ID += char_list[i]152 153 ID = E_ID + U_ID154 155 for key in Gas_param.keys():156 ID_temp = ''157 ID_temp = json.loads(Gas_param[key])['E_ID'] + json.loads(Gas_param[key])['U_ID']158 if ID_temp == ID:159 find = 1160 F_ID = key161 162 if find == 1:163 164 result = PAGE_PROCESS('01', img)165 166 Gas1_name=''167 Gas1_set=[]168 Gas1_error=[]169 Gas1_dict={}170 Gas1_name=result['OCR01']171 Gas1_set=[result['OCR02'],172 result['OCR03'],173 result['OCR04'],174 result['OCR05'],175 result['OCR06'],176 result['OCR07'],177 result['OCR08'],178 result['OCR09'],179 result['OCR10'],180 result['OCR11']181 ]182 Gas1_error=[result['OCR12'],183 result['OCR13'],184 result['OCR14'],185 result['OCR15'],186 result['OCR16'],187 result['OCR17'],188 result['OCR18'],189 result['OCR19'],190 result['OCR20'],191 result['OCR21']192 ]193194 Gas2_name=''195 Gas2_set=[]196 Gas2_error=[]197 Gas2_dict={}198 Gas2_name=result['OCR22']199 Gas2_set=[result['OCR23'],200 result['OCR24'],201 result['OCR25'],202 result['OCR26'],203 result['OCR27'],204 result['OCR28'],205 result['OCR29'],206 result['OCR30'],207 result['OCR31'],208 result['OCR32']209 ]210 Gas2_error=[result['OCR33'],211 result['OCR34'],212 result['OCR35'],213 result['OCR36'],214 result['OCR37'],215 result['OCR38'],216 result['OCR39'],217 result['OCR40'],218 result['OCR41'],219 result['OCR42']220 ]221222 Gas3_name=''223 Gas3_set=[]224 Gas3_error=[]225 Gas3_dict={}226 Gas3_name=result['OCR43']227 Gas3_set=[result['OCR44'],228 result['OCR45'],229 result['OCR46'],230 result['OCR47'],231 result['OCR48'],232 result['OCR49'],233 result['OCR50'],234 result['OCR51'],235 result['OCR52'],236 result['OCR53']237 ]238 Gas3_error=[result['OCR54'],239 result['OCR55'],240 result['OCR56'],241 result['OCR57'],242 result['OCR58'],243 result['OCR59'],244 result['OCR60'],245 result['OCR61'],246 result['OCR62'],247 result['OCR63']248 ]249250 Gas4_name=''251 Gas4_set=[]252 Gas4_error=[]253 Gas4_dict={}254 Gas4_name=result['OCR64']255 Gas4_set=[result['OCR65'],256 result['OCR66'],257 result['OCR67'],258 result['OCR68'],259 result['OCR69'],260 result['OCR70'],261 result['OCR71'],262 result['OCR72'],263 result['OCR73'],264 result['OCR74']265 ]266 Gas4_error=[result['OCR75'],267 result['OCR76'],268 result['OCR77'],269 result['OCR78'],270 result['OCR79'],271 result['OCR80'],272 result['OCR81'],273 result['OCR82'],274 result['OCR83'],275 result['OCR84']276 ]277278 Gas5_name=''279 Gas5_set=[]280 Gas5_error=[]281 Gas5_dict={}282 Gas5_name=result['OCR85']283 Gas5_set=[result['OCR86'],284 result['OCR87'],285 result['OCR88'],286 result['OCR89'],287 result['OCR90'],288 result['OCR91'],289 result['OCR92'],290 result['OCR93'],291 result['OCR94'],292 result['OCR95']293 ]294 Gas5_error=[result['OCR96'],295 result['OCR97'],296 result['OCR98'],297 result['OCR99'],298 result['OCR100'],299 result['OCR101'],300 result['OCR102'],301 result['OCR103'],302 result['OCR104'],303 result['OCR105']304 ]305306 Gas6_name=''307 Gas6_set=[]308 Gas6_error=[]309 Gas6_dict={}310 Gas6_name=result['OCR106']311 Gas6_set=[result['OCR107'],312 result['OCR108'],313 result['OCR109'],314 result['OCR110'],315 result['OCR111'],316 result['OCR112'],317 result['OCR113'],318 result['OCR114'],319 result['OCR115'],320 result['OCR116']321 ]322 Gas6_error=[result['OCR117'],323 result['OCR118'],324 result['OCR119'],325 result['OCR120'],326 result['OCR121'],327 result['OCR122'],328 result['OCR123'],329 result['OCR124'],330 result['OCR125'],331 result['OCR126']332 ]333334 Gas7_name=''335 Gas7_set=[]336 Gas7_error=[]337 Gas7_dict={}338 Gas7_name=result['OCR127']339 Gas7_set=[result['OCR128'],340 result['OCR129'],341 result['OCR130'],342 result['OCR131'],343 result['OCR132'],344 result['OCR133'],345 result['OCR134'],346 result['OCR135'],347 result['OCR136'],348 result['OCR137']349 ]350 Gas7_error=[result['OCR138'],351 result['OCR139'],352 result['OCR140'],353 result['OCR141'],354 result['OCR142'],355 result['OCR143'],356 result['OCR144'],357 result['OCR145'],358 result['OCR146'],359 result['OCR147']360 ]361362 Gas8_name=''363 Gas8_set=[]364 Gas8_error=[]365 Gas8_dict={}366 Gas8_name=result['OCR148']367 Gas8_set=[result['OCR149'],368 result['OCR150'],369 result['OCR151'],370 result['OCR152'],371 result['OCR153'],372 result['OCR154'],373 result['OCR155'],374 result['OCR156'],375 result['OCR157'],376 result['OCR158']377 ]378 Gas8_error=[result['OCR159'],379 result['OCR160'],380 result['OCR161'],381 result['OCR162'],382 result['OCR163'],383 result['OCR164'],384 result['OCR165'],385 result['OCR166'],386 result['OCR167'],387 result['OCR168']388 ]389390 Gas9_name=''391 Gas9_set=[]392 Gas9_error=[]393 Gas9_dict={}394 Gas9_name=result['OCR169']395 Gas9_set=[result['OCR170'],396 result['OCR171'],397 result['OCR172'],398 result['OCR173'],399 result['OCR174'],400 result['OCR175'],401 result['OCR176'],402 result['OCR177'],403 result['OCR178'],404 result['OCR179']405 ]406 Gas9_error=[result['OCR180'],407 result['OCR181'],408 result['OCR182'],409 result['OCR183'],410 result['OCR184'],411 result['OCR185'],412 result['OCR186'],413 result['OCR187'],414 result['OCR188'],415 result['OCR189']416 ]417418 Gas10_name=''419 Gas10_set=[]420 Gas10_error=[]421 Gas10_dict={}422 Gas10_name=result['OCR190']423 Gas10_set=[result['OCR191'],424 result['OCR192'],425 result['OCR193'],426 result['OCR194'],427 result['OCR195'],428 result['OCR196'],429 result['OCR197'],430 result['OCR198'],431 result['OCR199'],432 result['OCR200']433 ]434 Gas10_error=[result['OCR201'],435 result['OCR202'],436 result['OCR203'],437 result['OCR204'],438 result['OCR205'],439 result['OCR206'],440 result['OCR207'],441 result['OCR208'],442 result['OCR209'],443 result['OCR210']444 ]445446 Gas11_name=''447 Gas11_set=[]448 Gas11_error=[]449 Gas11_dict={}450 Gas11_name=result['OCR211']451 Gas11_set=[result['OCR212'],452 result['OCR213'],453 result['OCR214'],454 result['OCR215'],455 result['OCR216'],456 result['OCR217'],457 result['OCR218'],458 result['OCR219'],459 result['OCR220'],460 result['OCR221']461 ]462 Gas11_error=[result['OCR222'],463 result['OCR223'],464 result['OCR224'],465 result['OCR225'],466 result['OCR226'],467 result['OCR227'],468 result['OCR228'],469 result['OCR229'],470 result['OCR230'],471 result['OCR231']472 ]473474 Gas12_name=''475 Gas12_set=[]476 Gas12_error=[]477 Gas12_dict={}478 Gas12_name=result['OCR232']479 Gas12_set=[result['OCR233'],480 result['OCR234'],481 result['OCR235'],482 result['OCR236'],483 result['OCR237'],484 result['OCR238'],485 result['OCR239'],486 result['OCR240'],487 result['OCR241'],488 result['OCR242']489 ]490 Gas12_error=[result['OCR243'],491 result['OCR244'],492 result['OCR245'],493 result['OCR246'],494 result['OCR247'],495 result['OCR248'],496 result['OCR249'],497 result['OCR250'],498 result['OCR251'],499 result['OCR252']500 ]501502 Gas13_name=''503 Gas13_set=[]504 Gas13_error=[]505 Gas13_dict={}506 Gas13_name=result['OCR253']507 Gas13_set=[result['OCR254'],508 result['OCR255'],509 result['OCR256'],510 result['OCR257'],511 result['OCR258'],512 result['OCR259'],513 result['OCR260'],514 result['OCR261'],515 result['OCR262'],516 result['OCR263']517 ]518 Gas13_error=[result['OCR264'],519 result['OCR265'],520 result['OCR266'],521 result['OCR267'],522 result['OCR268'],523 result['OCR269'],524 result['OCR270'],525 result['OCR271'],526 result['OCR272'],527 result['OCR273']528 ]529530 Gas14_name=''531 Gas14_set=[]532 Gas14_error=[]533 Gas14_dict={}534 Gas14_name=result['OCR274']535 Gas14_set=[result['OCR275'],536 result['OCR276'],537 result['OCR277'],538 result['OCR278'],539 result['OCR279'],540 result['OCR280'],541 result['OCR281'],542 result['OCR282'],543 result['OCR283'],544 result['OCR284']545 ]546 Gas14_error=[result['OCR285'],547 result['OCR286'],548 result['OCR287'],549 result['OCR288'],550 result['OCR289'],551 result['OCR290'],552 result['OCR291'],553 result['OCR292'],554 result['OCR293'],555 result['OCR294']556 ]557558 Gas15_name=''559 Gas15_set=[]560 Gas15_error=[]561 Gas15_dict={}562 Gas15_name=result['OCR295']563 Gas15_set=[result['OCR296'],564 result['OCR297'],565 result['OCR298'],566 result['OCR299'],567 result['OCR300'],568 result['OCR301'],569 result['OCR302'],570 result['OCR303'],571 result['OCR304'],572 result['OCR305']573 ]574 Gas15_error=[result['OCR306'],575 result['OCR307'],576 result['OCR308'],577 result['OCR309'],578 result['OCR310'],579 result['OCR311'],580 result['OCR312'],581 result['OCR313'],582 result['OCR314'],583 result['OCR315']584 ]585586 Gas16_name=''587 Gas16_set=[]588 Gas16_error=[]589 Gas16_dict={}590 Gas16_name=result['OCR316']591 Gas16_set=[result['OCR317'],592 result['OCR318'],593 result['OCR319'],594 result['OCR320'],595 result['OCR321'],596 result['OCR322'],597 result['OCR323'],598 result['OCR324'],599 result['OCR325'],600 result['OCR326']601 ]602 Gas16_error=[result['OCR327'],603 result['OCR328'],604 result['OCR329'],605 result['OCR330'],606 result['OCR331'],607 result['OCR332'],608 result['OCR333'],609 result['OCR334'],610 result['OCR335'],611 result['OCR336']612 ]613 '''614 Gas17_name=''615 Gas17_set=[]616 Gas17_error=[]617 Gas17_dict={}618 Gas17_name='Tuning' + result['OCR337']619 Gas17_set=[result['OCR338'],620 result['OCR339'],621 result['OCR340'],622 result['OCR341'],623 result['OCR342'],624 result['OCR343'],625 result['OCR344'],626 result['OCR345'],627 result['OCR346'],628 result['OCR347']629 ]630 Gas17_error=[result['OCR348'],631 result['OCR349'],632 result['OCR350'],633 result['OCR351'],634 result['OCR352'],635 result['OCR353'],636 result['OCR354'],637 result['OCR355'],638 result['OCR356'],639 result['OCR357']640 ]641642 Gas18_name=''643 Gas18_set=[]644 Gas18_error=[]645 Gas18_dict={}646 Gas18_name='Tuning' + result['OCR358']647 Gas18_set=[result['OCR359'],648 result['OCR360'],649 result['OCR361'],650 result['OCR362'],651 result['OCR363'],652 result['OCR364'],653 result['OCR365'],654 result['OCR366'],655 result['OCR367'],656 result['OCR368']657 ]658 Gas18_error=[result['OCR369'],659 result['OCR370'],660 result['OCR371'],661 result['OCR372'],662 result['OCR373'],663 result['OCR374'],664 result['OCR375'],665 result['OCR376'],666 result['OCR377'],667 result['OCR378']668 ]669 '''670 for i in range(len(Gas1_set)):671 GasError(Gas1_dict, Gas1_set[i], Gas1_error[i])672 GasError(Gas2_dict, Gas2_set[i], Gas2_error[i])673 GasError(Gas3_dict, Gas3_set[i], Gas3_error[i])674 GasError(Gas4_dict, Gas4_set[i], Gas4_error[i])675 GasError(Gas5_dict, Gas5_set[i], Gas5_error[i])676 GasError(Gas6_dict, Gas6_set[i], Gas6_error[i])677 GasError(Gas7_dict, Gas7_set[i], Gas7_error[i])678 GasError(Gas8_dict, Gas8_set[i], Gas8_error[i])679 GasError(Gas9_dict, Gas9_set[i], Gas9_error[i])680 GasError(Gas10_dict, Gas10_set[i], Gas10_error[i])681 GasError(Gas11_dict, Gas11_set[i], Gas11_error[i])682 GasError(Gas12_dict, Gas12_set[i], Gas12_error[i])683 GasError(Gas13_dict, Gas13_set[i], Gas13_error[i])684 GasError(Gas14_dict, Gas14_set[i], Gas14_error[i])685 GasError(Gas15_dict, Gas15_set[i], Gas15_error[i])686 GasError(Gas16_dict, Gas16_set[i], Gas16_error[i])687 #GasError(Gas17_dict, Gas17_set[i], Gas17_error[i])688 #GasError(Gas18_dict, Gas18_set[i], Gas18_error[i])689690 Gas_dict_temp[Gas1_name]=json.dumps(Gas1_dict)691 Gas_dict_temp[Gas2_name]=json.dumps(Gas2_dict)692 Gas_dict_temp[Gas3_name]=json.dumps(Gas3_dict)693 Gas_dict_temp[Gas4_name]=json.dumps(Gas4_dict)694 Gas_dict_temp[Gas5_name]=json.dumps(Gas5_dict)695 Gas_dict_temp[Gas6_name]=json.dumps(Gas6_dict)696 Gas_dict_temp[Gas7_name]=json.dumps(Gas7_dict)697 Gas_dict_temp[Gas8_name]=json.dumps(Gas8_dict)698 Gas_dict_temp[Gas9_name]=json.dumps(Gas9_dict)699 Gas_dict_temp[Gas10_name]=json.dumps(Gas10_dict)700 Gas_dict_temp[Gas11_name]=json.dumps(Gas11_dict)701 Gas_dict_temp[Gas12_name]=json.dumps(Gas12_dict)702 Gas_dict_temp[Gas13_name]=json.dumps(Gas13_dict)703 Gas_dict_temp[Gas14_name]=json.dumps(Gas14_dict)704 Gas_dict_temp[Gas15_name]=json.dumps(Gas15_dict)705 Gas_dict_temp[Gas16_name]=json.dumps(Gas16_dict)706 #Gas_dict_temp[Gas17_name]=json.dumps(Gas17_dict)707 #Gas_dict_temp[Gas18_name]=json.dumps(Gas18_dict)708 #SHOW_IMAGE(img)709710 E_ID=json.loads(Gas_param[F_ID])['E_ID']711 Gas_dict_temp['E_ID'] = E_ID712 Gas_return_temp['E_ID'] = E_ID713 U_ID=json.loads(Gas_param[F_ID])['U_ID']714 Gas_dict_temp['U_ID'] = U_ID715 Gas_return_temp['U_ID'] = U_ID716 O_ID=json.loads(Gas_param[F_ID])['O_ID']717 Gas_dict_temp['O_ID'] = O_ID718 Gas_return_temp['O_ID'] = O_ID719 F_ID=json.loads(Gas_param[F_ID])['F_ID']720 Gas_dict_temp['F_ID'] = F_ID721 Gas_return_temp['F_ID'] = F_ID722 723 Gas_set = json.loads(Gas_param[F_ID])['Gas_set']724 Gas_set = Gas_set.split(',')725 726 for i in range(len(Gas_set)):727 temp=Gas_set[i].split('_')728 try:729 Gas_return_temp[temp[0]+'_'+temp[1]]=json.loads(Gas_dict_temp[temp[0]])[temp[1]]730 except:731 if not temp[0] in Gas_dict_temp.keys():732 Gas_return_temp[temp[0]]='Unknown'733 elif not temp[1] in json.loads(Gas_dict_temp[temp[0]]).keys():734 Gas_return_temp[temp[0]+'_'+temp[1]]='Unknown'735736 lock.acquire()737738 Gas_dict[F_ID] = json.dumps(Gas_dict_temp)739 Gas_return[F_ID] = json.dumps(Gas_return_temp)740741 lock.release()742743 #print(Gas_dict)744 #print(Gas_return)745746 return {'result':'Data for F_ID = ' + F_ID +', E_ID = ' + E_ID + ', and U_ID = ' + U_ID + ' is ready.', 'is_success':'Y'}747 748 else:749 return {'error_message':'Cannot find data matching F_ID = ' + F_ID +', E_ID = ' + E_ID + ', and U_ID = ' + U_ID + '.', 'is_success':'N'}750751752 753 754 755756def GasError(Gas_Temp_Dict, Set, Error):757 758 if Set=='' or Error=='':759 pass760 elif not Set in Gas_Temp_Dict.keys():761 Gas_Temp_Dict[Set]=Error762763'''764def GetGasSet(**kwargs):765 766 global Gas_dict, Gas_return, Gas_param767 768 return_dict = {}769 770 try:771 E_ID=kwargs['E_ID']772 except:773 return {'Error':'Cannot find E_ID.'}774 775 try:776 U_ID=kwargs['U_ID']777 except:778 return {'Error':'Cannot find U_ID.'}779 780 ID = E_ID + U_ID781 782 if (ID in Gas_return.keys() and ID in Gas_dict.keys()) and ID in Gas_param.keys():783 784 try:785 Gas_name = kwargs['Gas_name']786 Gas_name = Gas_name.split(',')787 Gas_dict_temp = json.loads(Gas_dict[ID])788 789 if Gas_name[0] == '':790 return {'Error':'There is no input parameter.'}791 792 for i in range(len(Gas_name)):793 794 try:795 return_dict[Gas_name[i]] = Gas_dict_temp[Gas_name[i]]796 except:797 if Gas_name[i] == '':798 return {'Error':'There is error in input parameters. Please check them.'}799 else:800 return_dict[Gas_name[i]] = 'Unknown Gas_name'801 802 return return_dict 803 except:804 return {'Error':'Cannot find Gas_name'}805 else:806 return {'Error':'Cannot find data matching E_ID = ' + E_ID +' and U_ID = ' + U_ID + '.'}807 808 809 return return_dict810''' 811 812 813def GetGasData(**kwargs):814 815 global Gas_dict, Gas_return, Gas_param, lock816 817 return_dict = {}818 819 try:820 F_ID=kwargs['F_ID']821 except:822 return {'error_message':'Cannot find F_ID.', 'is_success':'N'}823 824 825 if (F_ID in Gas_return.keys() and F_ID in Gas_dict.keys()) and F_ID in Gas_param.keys():826827 return_dict['data'] = Gas_return[F_ID]828 return_dict['is_success'] = 'Y'829 830 lock.acquire()831 832 Gas_dict.pop(F_ID, None)833 Gas_return.pop(F_ID, None)834 Gas_param.pop(F_ID, None)835 836 lock.release()837 838 return return_dict 839840 else:841 return {'error_message':'Cannot find data matching F_ID = ' + F_ID + '.', 'is_success':'N'}842 843 ...

Full Screen

Full Screen

test_result.py

Source:test_result.py Github

copy

Full Screen

1import sys2import textwrap3from StringIO import StringIO4from test import test_support5import traceback6import unittest7class Test_TestResult(unittest.TestCase):8 # Note: there are not separate tests for TestResult.wasSuccessful(),9 # TestResult.errors, TestResult.failures, TestResult.testsRun or10 # TestResult.shouldStop because these only have meaning in terms of11 # other TestResult methods.12 #13 # Accordingly, tests for the aforenamed attributes are incorporated14 # in with the tests for the defining methods.15 ################################################################16 def test_init(self):17 result = unittest.TestResult()18 self.assertTrue(result.wasSuccessful())19 self.assertEqual(len(result.errors), 0)20 self.assertEqual(len(result.failures), 0)21 self.assertEqual(result.testsRun, 0)22 self.assertEqual(result.shouldStop, False)23 self.assertIsNone(result._stdout_buffer)24 self.assertIsNone(result._stderr_buffer)25 # "This method can be called to signal that the set of tests being26 # run should be aborted by setting the TestResult's shouldStop27 # attribute to True."28 def test_stop(self):29 result = unittest.TestResult()30 result.stop()31 self.assertEqual(result.shouldStop, True)32 # "Called when the test case test is about to be run. The default33 # implementation simply increments the instance's testsRun counter."34 def test_startTest(self):35 class Foo(unittest.TestCase):36 def test_1(self):37 pass38 test = Foo('test_1')39 result = unittest.TestResult()40 result.startTest(test)41 self.assertTrue(result.wasSuccessful())42 self.assertEqual(len(result.errors), 0)43 self.assertEqual(len(result.failures), 0)44 self.assertEqual(result.testsRun, 1)45 self.assertEqual(result.shouldStop, False)46 result.stopTest(test)47 # "Called after the test case test has been executed, regardless of48 # the outcome. The default implementation does nothing."49 def test_stopTest(self):50 class Foo(unittest.TestCase):51 def test_1(self):52 pass53 test = Foo('test_1')54 result = unittest.TestResult()55 result.startTest(test)56 self.assertTrue(result.wasSuccessful())57 self.assertEqual(len(result.errors), 0)58 self.assertEqual(len(result.failures), 0)59 self.assertEqual(result.testsRun, 1)60 self.assertEqual(result.shouldStop, False)61 result.stopTest(test)62 # Same tests as above; make sure nothing has changed63 self.assertTrue(result.wasSuccessful())64 self.assertEqual(len(result.errors), 0)65 self.assertEqual(len(result.failures), 0)66 self.assertEqual(result.testsRun, 1)67 self.assertEqual(result.shouldStop, False)68 # "Called before and after tests are run. The default implementation does nothing."69 def test_startTestRun_stopTestRun(self):70 result = unittest.TestResult()71 result.startTestRun()72 result.stopTestRun()73 # "addSuccess(test)"74 # ...75 # "Called when the test case test succeeds"76 # ...77 # "wasSuccessful() - Returns True if all tests run so far have passed,78 # otherwise returns False"79 # ...80 # "testsRun - The total number of tests run so far."81 # ...82 # "errors - A list containing 2-tuples of TestCase instances and83 # formatted tracebacks. Each tuple represents a test which raised an84 # unexpected exception. Contains formatted85 # tracebacks instead of sys.exc_info() results."86 # ...87 # "failures - A list containing 2-tuples of TestCase instances and88 # formatted tracebacks. Each tuple represents a test where a failure was89 # explicitly signalled using the TestCase.fail*() or TestCase.assert*()90 # methods. Contains formatted tracebacks instead91 # of sys.exc_info() results."92 def test_addSuccess(self):93 class Foo(unittest.TestCase):94 def test_1(self):95 pass96 test = Foo('test_1')97 result = unittest.TestResult()98 result.startTest(test)99 result.addSuccess(test)100 result.stopTest(test)101 self.assertTrue(result.wasSuccessful())102 self.assertEqual(len(result.errors), 0)103 self.assertEqual(len(result.failures), 0)104 self.assertEqual(result.testsRun, 1)105 self.assertEqual(result.shouldStop, False)106 # "addFailure(test, err)"107 # ...108 # "Called when the test case test signals a failure. err is a tuple of109 # the form returned by sys.exc_info(): (type, value, traceback)"110 # ...111 # "wasSuccessful() - Returns True if all tests run so far have passed,112 # otherwise returns False"113 # ...114 # "testsRun - The total number of tests run so far."115 # ...116 # "errors - A list containing 2-tuples of TestCase instances and117 # formatted tracebacks. Each tuple represents a test which raised an118 # unexpected exception. Contains formatted119 # tracebacks instead of sys.exc_info() results."120 # ...121 # "failures - A list containing 2-tuples of TestCase instances and122 # formatted tracebacks. Each tuple represents a test where a failure was123 # explicitly signalled using the TestCase.fail*() or TestCase.assert*()124 # methods. Contains formatted tracebacks instead125 # of sys.exc_info() results."126 def test_addFailure(self):127 class Foo(unittest.TestCase):128 def test_1(self):129 pass130 test = Foo('test_1')131 try:132 test.fail("foo")133 except:134 exc_info_tuple = sys.exc_info()135 result = unittest.TestResult()136 result.startTest(test)137 result.addFailure(test, exc_info_tuple)138 result.stopTest(test)139 self.assertFalse(result.wasSuccessful())140 self.assertEqual(len(result.errors), 0)141 self.assertEqual(len(result.failures), 1)142 self.assertEqual(result.testsRun, 1)143 self.assertEqual(result.shouldStop, False)144 test_case, formatted_exc = result.failures[0]145 self.assertIs(test_case, test)146 self.assertIsInstance(formatted_exc, str)147 # "addError(test, err)"148 # ...149 # "Called when the test case test raises an unexpected exception err150 # is a tuple of the form returned by sys.exc_info():151 # (type, value, traceback)"152 # ...153 # "wasSuccessful() - Returns True if all tests run so far have passed,154 # otherwise returns False"155 # ...156 # "testsRun - The total number of tests run so far."157 # ...158 # "errors - A list containing 2-tuples of TestCase instances and159 # formatted tracebacks. Each tuple represents a test which raised an160 # unexpected exception. Contains formatted161 # tracebacks instead of sys.exc_info() results."162 # ...163 # "failures - A list containing 2-tuples of TestCase instances and164 # formatted tracebacks. Each tuple represents a test where a failure was165 # explicitly signalled using the TestCase.fail*() or TestCase.assert*()166 # methods. Contains formatted tracebacks instead167 # of sys.exc_info() results."168 def test_addError(self):169 class Foo(unittest.TestCase):170 def test_1(self):171 pass172 test = Foo('test_1')173 try:174 raise TypeError()175 except:176 exc_info_tuple = sys.exc_info()177 result = unittest.TestResult()178 result.startTest(test)179 result.addError(test, exc_info_tuple)180 result.stopTest(test)181 self.assertFalse(result.wasSuccessful())182 self.assertEqual(len(result.errors), 1)183 self.assertEqual(len(result.failures), 0)184 self.assertEqual(result.testsRun, 1)185 self.assertEqual(result.shouldStop, False)186 test_case, formatted_exc = result.errors[0]187 self.assertIs(test_case, test)188 self.assertIsInstance(formatted_exc, str)189 def testGetDescriptionWithoutDocstring(self):190 result = unittest.TextTestResult(None, True, 1)191 self.assertEqual(192 result.getDescription(self),193 'testGetDescriptionWithoutDocstring (' + __name__ +194 '.Test_TestResult)')195 @unittest.skipIf(sys.flags.optimize >= 2,196 "Docstrings are omitted with -O2 and above")197 def testGetDescriptionWithOneLineDocstring(self):198 """Tests getDescription() for a method with a docstring."""199 result = unittest.TextTestResult(None, True, 1)200 self.assertEqual(201 result.getDescription(self),202 ('testGetDescriptionWithOneLineDocstring '203 '(' + __name__ + '.Test_TestResult)\n'204 'Tests getDescription() for a method with a docstring.'))205 @unittest.skipIf(sys.flags.optimize >= 2,206 "Docstrings are omitted with -O2 and above")207 def testGetDescriptionWithMultiLineDocstring(self):208 """Tests getDescription() for a method with a longer docstring.209 The second line of the docstring.210 """211 result = unittest.TextTestResult(None, True, 1)212 self.assertEqual(213 result.getDescription(self),214 ('testGetDescriptionWithMultiLineDocstring '215 '(' + __name__ + '.Test_TestResult)\n'216 'Tests getDescription() for a method with a longer '217 'docstring.'))218 def testStackFrameTrimming(self):219 class Frame(object):220 class tb_frame(object):221 f_globals = {}222 result = unittest.TestResult()223 self.assertFalse(result._is_relevant_tb_level(Frame))224 Frame.tb_frame.f_globals['__unittest'] = True225 self.assertTrue(result._is_relevant_tb_level(Frame))226 def testFailFast(self):227 result = unittest.TestResult()228 result._exc_info_to_string = lambda *_: ''229 result.failfast = True230 result.addError(None, None)231 self.assertTrue(result.shouldStop)232 result = unittest.TestResult()233 result._exc_info_to_string = lambda *_: ''234 result.failfast = True235 result.addFailure(None, None)236 self.assertTrue(result.shouldStop)237 result = unittest.TestResult()238 result._exc_info_to_string = lambda *_: ''239 result.failfast = True240 result.addUnexpectedSuccess(None)241 self.assertTrue(result.shouldStop)242 def testFailFastSetByRunner(self):243 runner = unittest.TextTestRunner(stream=StringIO(), failfast=True)244 def test(result):245 self.assertTrue(result.failfast)246 runner.run(test)247classDict = dict(unittest.TestResult.__dict__)248for m in ('addSkip', 'addExpectedFailure', 'addUnexpectedSuccess',249 '__init__'):250 del classDict[m]251def __init__(self, stream=None, descriptions=None, verbosity=None):252 self.failures = []253 self.errors = []254 self.testsRun = 0255 self.shouldStop = False256 self.buffer = False257classDict['__init__'] = __init__258OldResult = type('OldResult', (object,), classDict)259class Test_OldTestResult(unittest.TestCase):260 def assertOldResultWarning(self, test, failures):261 with test_support.check_warnings(("TestResult has no add.+ method,",262 RuntimeWarning)):263 result = OldResult()264 test.run(result)265 self.assertEqual(len(result.failures), failures)266 def testOldTestResult(self):267 class Test(unittest.TestCase):268 def testSkip(self):269 self.skipTest('foobar')270 @unittest.expectedFailure271 def testExpectedFail(self):272 raise TypeError273 @unittest.expectedFailure274 def testUnexpectedSuccess(self):275 pass276 for test_name, should_pass in (('testSkip', True),277 ('testExpectedFail', True),278 ('testUnexpectedSuccess', False)):279 test = Test(test_name)280 self.assertOldResultWarning(test, int(not should_pass))281 def testOldTestTesultSetup(self):282 class Test(unittest.TestCase):283 def setUp(self):284 self.skipTest('no reason')285 def testFoo(self):286 pass287 self.assertOldResultWarning(Test('testFoo'), 0)288 def testOldTestResultClass(self):289 @unittest.skip('no reason')290 class Test(unittest.TestCase):291 def testFoo(self):292 pass293 self.assertOldResultWarning(Test('testFoo'), 0)294 def testOldResultWithRunner(self):295 class Test(unittest.TestCase):296 def testFoo(self):297 pass298 runner = unittest.TextTestRunner(resultclass=OldResult,299 stream=StringIO())300 # This will raise an exception if TextTestRunner can't handle old301 # test result objects302 runner.run(Test('testFoo'))303class MockTraceback(object):304 @staticmethod305 def format_exception(*_):306 return ['A traceback']307def restore_traceback():308 unittest.result.traceback = traceback309class TestOutputBuffering(unittest.TestCase):310 def setUp(self):311 self._real_out = sys.stdout312 self._real_err = sys.stderr313 def tearDown(self):314 sys.stdout = self._real_out315 sys.stderr = self._real_err316 def testBufferOutputOff(self):317 real_out = self._real_out318 real_err = self._real_err319 result = unittest.TestResult()320 self.assertFalse(result.buffer)321 self.assertIs(real_out, sys.stdout)322 self.assertIs(real_err, sys.stderr)323 result.startTest(self)324 self.assertIs(real_out, sys.stdout)325 self.assertIs(real_err, sys.stderr)326 def testBufferOutputStartTestAddSuccess(self):327 real_out = self._real_out328 real_err = self._real_err329 result = unittest.TestResult()330 self.assertFalse(result.buffer)331 result.buffer = True332 self.assertIs(real_out, sys.stdout)333 self.assertIs(real_err, sys.stderr)334 result.startTest(self)335 self.assertIsNot(real_out, sys.stdout)336 self.assertIsNot(real_err, sys.stderr)337 self.assertIsInstance(sys.stdout, StringIO)338 self.assertIsInstance(sys.stderr, StringIO)339 self.assertIsNot(sys.stdout, sys.stderr)340 out_stream = sys.stdout341 err_stream = sys.stderr342 result._original_stdout = StringIO()343 result._original_stderr = StringIO()344 print 'foo'345 print >> sys.stderr, 'bar'346 self.assertEqual(out_stream.getvalue(), 'foo\n')347 self.assertEqual(err_stream.getvalue(), 'bar\n')348 self.assertEqual(result._original_stdout.getvalue(), '')349 self.assertEqual(result._original_stderr.getvalue(), '')350 result.addSuccess(self)351 result.stopTest(self)352 self.assertIs(sys.stdout, result._original_stdout)353 self.assertIs(sys.stderr, result._original_stderr)354 self.assertEqual(result._original_stdout.getvalue(), '')355 self.assertEqual(result._original_stderr.getvalue(), '')356 self.assertEqual(out_stream.getvalue(), '')357 self.assertEqual(err_stream.getvalue(), '')358 def getStartedResult(self):359 result = unittest.TestResult()360 result.buffer = True361 result.startTest(self)362 return result363 def testBufferOutputAddErrorOrFailure(self):364 unittest.result.traceback = MockTraceback365 self.addCleanup(restore_traceback)366 for message_attr, add_attr, include_error in [367 ('errors', 'addError', True),368 ('failures', 'addFailure', False),369 ('errors', 'addError', True),370 ('failures', 'addFailure', False)371 ]:372 result = self.getStartedResult()373 buffered_out = sys.stdout374 buffered_err = sys.stderr375 result._original_stdout = StringIO()376 result._original_stderr = StringIO()377 print >> sys.stdout, 'foo'378 if include_error:379 print >> sys.stderr, 'bar'380 addFunction = getattr(result, add_attr)381 addFunction(self, (None, None, None))382 result.stopTest(self)383 result_list = getattr(result, message_attr)384 self.assertEqual(len(result_list), 1)385 test, message = result_list[0]386 expectedOutMessage = textwrap.dedent("""387 Stdout:388 foo389 """)390 expectedErrMessage = ''391 if include_error:392 expectedErrMessage = textwrap.dedent("""393 Stderr:394 bar395 """)396 expectedFullMessage = 'A traceback%s%s' % (expectedOutMessage, expectedErrMessage)397 self.assertIs(test, self)398 self.assertEqual(result._original_stdout.getvalue(), expectedOutMessage)399 self.assertEqual(result._original_stderr.getvalue(), expectedErrMessage)400 self.assertMultiLineEqual(message, expectedFullMessage)401 def testBufferSetupClass(self):402 result = unittest.TestResult()403 result.buffer = True404 class Foo(unittest.TestCase):405 @classmethod406 def setUpClass(cls):407 1//0408 def test_foo(self):409 pass410 suite = unittest.TestSuite([Foo('test_foo')])411 suite(result)412 self.assertEqual(len(result.errors), 1)413 def testBufferTearDownClass(self):414 result = unittest.TestResult()415 result.buffer = True416 class Foo(unittest.TestCase):417 @classmethod418 def tearDownClass(cls):419 1//0420 def test_foo(self):421 pass422 suite = unittest.TestSuite([Foo('test_foo')])423 suite(result)424 self.assertEqual(len(result.errors), 1)425 def testBufferSetUpModule(self):426 result = unittest.TestResult()427 result.buffer = True428 class Foo(unittest.TestCase):429 def test_foo(self):430 pass431 class Module(object):432 @staticmethod433 def setUpModule():434 1//0435 Foo.__module__ = 'Module'436 sys.modules['Module'] = Module437 self.addCleanup(sys.modules.pop, 'Module')438 suite = unittest.TestSuite([Foo('test_foo')])439 suite(result)440 self.assertEqual(len(result.errors), 1)441 def testBufferTearDownModule(self):442 result = unittest.TestResult()443 result.buffer = True444 class Foo(unittest.TestCase):445 def test_foo(self):446 pass447 class Module(object):448 @staticmethod449 def tearDownModule():450 1//0451 Foo.__module__ = 'Module'452 sys.modules['Module'] = Module453 self.addCleanup(sys.modules.pop, 'Module')454 suite = unittest.TestSuite([Foo('test_foo')])455 suite(result)456 self.assertEqual(len(result.errors), 1)457if __name__ == '__main__':...

Full Screen

Full Screen

test_exporters.py

Source:test_exporters.py Github

copy

Full Screen

1"""2mbed SDK3Copyright (c) 2011-2014 ARM Limited4Licensed under the Apache License, Version 2.0 (the "License");5you may not use this file except in compliance with the License.6You may obtain a copy of the License at7 http://www.apache.org/licenses/LICENSE-2.08Unless required by applicable law or agreed to in writing, software9distributed under the License is distributed on an "AS IS" BASIS,10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11See the License for the specific language governing permissions and12limitations under the License.13Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com>14"""15from tools.utils import construct_enum, mkdir16from prettytable import PrettyTable17import os18ResultExporterType = construct_enum(HTML='Html_Exporter',19 JUNIT='JUnit_Exporter',20 JUNIT_OPER='JUnit_Exporter_Interoperability',21 BUILD='Build_Exporter',22 TEXT='Text_Exporter',23 PRINT='Print_Exporter')24class ReportExporter():25 """ Class exports extended test result Python data structure to26 different formats like HTML, JUnit XML.27 Parameter 'test_result_ext' format:28 u'uARM': { u'LPC1768': { 'MBED_2': { 0: { 'copy_method': 'shutils.copy()',29 'duration': 20,30 'elapsed_time': 1.7929999828338623,31 'output': 'Host test instrumentation on ...\r\n',32 'result': 'OK',33 'target_name': u'LPC1768',34 'description': 'stdio',35 'id': u'MBED_2',36 'toolchain_name': u'uARM'}},37 """38 CSS_STYLE = """<style>39 .name{40 border: 1px solid;41 border-radius: 25px;42 width: 100px;43 }44 .tooltip{45 position:absolute;46 background-color: #F5DA81;47 display:none;48 }49 </style>50 """51 JAVASCRIPT = """52 <script type="text/javascript">53 function show (elem) {54 elem.style.display = "block";55 }56 function hide (elem) {57 elem.style.display = "";58 }59 </script>60 """61 def __init__(self, result_exporter_type, package="test"):62 self.result_exporter_type = result_exporter_type63 self.package = package64 def report(self, test_summary_ext, test_suite_properties=None,65 print_log_for_failures=True):66 """ Invokes report depending on exporter_type set in constructor67 """68 if self.result_exporter_type == ResultExporterType.HTML:69 # HTML exporter70 return self.exporter_html(test_summary_ext, test_suite_properties)71 elif self.result_exporter_type == ResultExporterType.JUNIT:72 # JUNIT exporter for results from test suite73 return self.exporter_junit(test_summary_ext, test_suite_properties)74 elif self.result_exporter_type == ResultExporterType.JUNIT_OPER:75 # JUNIT exporter for interoperability test76 return self.exporter_junit_ioper(test_summary_ext, test_suite_properties)77 elif self.result_exporter_type == ResultExporterType.PRINT:78 # JUNIT exporter for interoperability test79 return self.exporter_print(test_summary_ext, print_log_for_failures=print_log_for_failures)80 elif self.result_exporter_type == ResultExporterType.TEXT:81 return self.exporter_text(test_summary_ext)82 return None83 def report_to_file(self, test_summary_ext, file_name, test_suite_properties=None):84 """ Stores report to specified file85 """86 report = self.report(test_summary_ext, test_suite_properties=test_suite_properties)87 self.write_to_file(report, file_name)88 def write_to_file(self, report, file_name):89 if report is not None:90 dirname = os.path.dirname(file_name)91 if dirname:92 mkdir(dirname)93 with open(file_name, 'w') as f:94 f.write(report)95 def get_tooltip_name(self, toolchain, target, test_id, loop_no):96 """ Generate simple unique tool-tip name which can be used.97 For example as HTML <div> section id attribute.98 """99 return "target_test_%s_%s_%s_%s"% (toolchain.lower(), target.lower(), test_id.lower(), loop_no)100 def get_result_div_sections(self, test, test_no):101 """ Generates separate <DIV> sections which contains test results output.102 """103 RESULT_COLORS = {'OK': 'LimeGreen',104 'FAIL': 'Orange',105 'ERROR': 'LightCoral',106 'OTHER': 'LightGray',107 }108 tooltip_name = self.get_tooltip_name(test['toolchain_name'], test['target_name'], test['id'], test_no)109 background_color = RESULT_COLORS[test['result'] if test['result'] in RESULT_COLORS else 'OTHER']110 result_div_style = "background-color: %s"% background_color111 result = """<div class="name" style="%s" onmouseover="show(%s)" onmouseout="hide(%s)">112 <center>%s</center>113 <div class = "tooltip" id= "%s">114 <b>%s</b><br />115 <hr />116 <b>%s</b> in <b>%.2f sec</b><br />117 <hr />118 <small>119 %s120 </small>121 </div>122 </div>123 """% (result_div_style,124 tooltip_name,125 tooltip_name,126 test['result'],127 tooltip_name,128 test['target_name_unique'],129 test['description'],130 test['elapsed_time'],131 test['output'].replace('\n', '<br />'))132 return result133 def get_result_tree(self, test_results):134 """ If test was run in a loop (we got few results from the same test)135 we will show it in a column to see all results.136 This function produces HTML table with corresponding results.137 """138 result = ''139 for i, test_result in enumerate(test_results):140 result += '<table>'141 test_ids = sorted(test_result.keys())142 for test_no in test_ids:143 test = test_result[test_no]144 result += """<tr>145 <td valign="top">%s</td>146 </tr>"""% self.get_result_div_sections(test, "%d_%d" % (test_no, i))147 result += '</table>'148 return result149 def get_all_unique_test_ids(self, test_result_ext):150 """ Gets all unique test ids from all ran tests.151 We need this to create complete list of all test ran.152 """153 result = []154 targets = test_result_ext.keys()155 for target in targets:156 toolchains = test_result_ext[target].keys()157 for toolchain in toolchains:158 tests = test_result_ext[target][toolchain].keys()159 result.extend(tests)160 return sorted(list(set(result)))161 #162 # Exporters functions163 #164 def exporter_html(self, test_result_ext, test_suite_properties=None):165 """ Export test results in proprietary HTML format.166 """167 result = """<html>168 <head>169 <title>mbed SDK test suite test result report</title>170 %s171 %s172 </head>173 <body>174 """% (self.CSS_STYLE, self.JAVASCRIPT)175 unique_test_ids = self.get_all_unique_test_ids(test_result_ext)176 targets = sorted(test_result_ext.keys())177 result += '<table>'178 for target in targets:179 toolchains = sorted(test_result_ext[target].keys())180 for toolchain in toolchains:181 result += '<tr>'182 result += '<td></td>'183 result += '<td></td>'184 tests = sorted(test_result_ext[target][toolchain].keys())185 for test in unique_test_ids:186 result += """<td align="center">%s</td>"""% test187 result += """</tr>188 <tr>189 <td valign="center">%s</td>190 <td valign="center"><b>%s</b></td>191 """% (toolchain, target)192 for test in unique_test_ids:193 test_result = self.get_result_tree(test_result_ext[target][toolchain][test]) if test in tests else ''194 result += '<td>%s</td>'% (test_result)195 result += '</tr>'196 result += '</table>'197 result += '</body></html>'198 return result199 def exporter_junit_ioper(self, test_result_ext, test_suite_properties=None):200 from junit_xml import TestSuite, TestCase201 test_suites = []202 test_cases = []203 for platform in sorted(test_result_ext.keys()):204 # {platform : ['Platform', 'Result', 'Scope', 'Description'])205 test_cases = []206 for tr_result in test_result_ext[platform]:207 result, name, scope, description = tr_result208 classname = 'test.ioper.%s.%s.%s' % (platform, name, scope)209 elapsed_sec = 0210 _stdout = description211 _stderr = ''212 # Test case213 tc = TestCase(name, classname, elapsed_sec, _stdout, _stderr)214 # Test case extra failure / error info215 if result == 'FAIL':216 tc.add_failure_info(description, _stdout)217 elif result == 'ERROR':218 tc.add_error_info(description, _stdout)219 elif result == 'SKIP' or result == 'NOT_SUPPORTED':220 tc.add_skipped_info(description, _stdout)221 test_cases.append(tc)222 ts = TestSuite("test.suite.ioper.%s" % (platform), test_cases)223 test_suites.append(ts)224 return TestSuite.to_xml_string(test_suites)225 def exporter_junit(self, test_result_ext, test_suite_properties=None):226 """ Export test results in JUnit XML compliant format227 """228 from junit_xml import TestSuite, TestCase229 test_suites = []230 test_cases = []231 targets = sorted(test_result_ext.keys())232 for target in targets:233 toolchains = sorted(test_result_ext[target].keys())234 for toolchain in toolchains:235 test_cases = []236 tests = sorted(test_result_ext[target][toolchain].keys())237 for test in tests:238 test_results = test_result_ext[target][toolchain][test]239 for test_res in test_results:240 test_ids = sorted(test_res.keys())241 for test_no in test_ids:242 test_result = test_res[test_no]243 name = test_result['description']244 classname = '%s.%s.%s.%s'% (self.package, target, toolchain, test_result['id'])245 elapsed_sec = test_result['elapsed_time']246 _stdout = test_result['output']247 if 'target_name_unique' in test_result:248 _stderr = test_result['target_name_unique']249 else:250 _stderr = test_result['target_name']251 # Test case252 tc = TestCase(name, classname, elapsed_sec, _stdout, _stderr)253 # Test case extra failure / error info254 message = test_result['result']255 if test_result['result'] == 'FAIL':256 tc.add_failure_info(message, _stdout)257 elif test_result['result'] == 'SKIP' or test_result["result"] == 'NOT_SUPPORTED':258 tc.add_skipped_info(message, _stdout)259 elif test_result['result'] != 'OK':260 tc.add_error_info(message, _stdout)261 test_cases.append(tc)262 ts = TestSuite("test.suite.%s.%s"% (target, toolchain), test_cases, properties=test_suite_properties[target][toolchain])263 test_suites.append(ts)264 return TestSuite.to_xml_string(test_suites)265 def exporter_print_helper(self, array, print_log=False):266 for item in array:267 print " * %s::%s::%s" % (item["target_name"], item["toolchain_name"], item["id"])268 if print_log:269 log_lines = item["output"].split("\n")270 for log_line in log_lines:271 print " %s" % log_line272 def exporter_print(self, test_result_ext, print_log_for_failures=False):273 """ Export test results in print format.274 """275 failures = []276 skips = []277 successes = []278 unique_test_ids = self.get_all_unique_test_ids(test_result_ext)279 targets = sorted(test_result_ext.keys())280 for target in targets:281 toolchains = sorted(test_result_ext[target].keys())282 for toolchain in toolchains:283 tests = sorted(test_result_ext[target][toolchain].keys())284 for test in tests:285 test_runs = test_result_ext[target][toolchain][test]286 for test_runner in test_runs:287 #test_run = test_result_ext[target][toolchain][test][test_run_number][0]288 test_run = test_runner[0]289 290 if "result" in test_run:291 if test_run["result"] == "FAIL":292 failures.append(test_run)293 elif test_run["result"] == "SKIP" or test_run["result"] == "NOT_SUPPORTED":294 skips.append(test_run)295 elif test_run["result"] == "OK":296 successes.append(test_run)297 else:298 raise Exception("Unhandled result type: %s" % (test_run["result"]))299 else:300 raise Exception("'test_run' did not have a 'result' value")301 if successes:302 print "\n\nBuild successes:"303 self.exporter_print_helper(successes)304 if skips:305 print "\n\nBuild skips:"306 self.exporter_print_helper(skips)307 if failures:308 print "\n\nBuild failures:"309 self.exporter_print_helper(failures, print_log=print_log_for_failures)310 return False311 else:312 return True313 def exporter_text(self, test_result_ext):314 """ Prints well-formed summary with results (SQL table like)315 table shows target x test results matrix across316 """317 success_code = 0 # Success code that can be leter returned to318 # Pretty table package is used to print results319 pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",320 "Elapsed Time", "Timeout"])321 pt.align["Result"] = "l" # Left align322 pt.align["Target"] = "l" # Left align323 pt.align["Toolchain"] = "l" # Left align324 pt.align["Test ID"] = "l" # Left align325 pt.align["Test Description"] = "l" # Left align326 pt.padding_width = 1 # One space between column edges and contents (default)327 result_dict = {"OK" : 0,328 "FAIL" : 0,329 "ERROR" : 0,330 "UNDEF" : 0,331 "IOERR_COPY" : 0,332 "IOERR_DISK" : 0,333 "IOERR_SERIAL" : 0,334 "TIMEOUT" : 0,335 "NO_IMAGE" : 0,336 "MBED_ASSERT" : 0,337 "BUILD_FAILED" : 0,338 "NOT_SUPPORTED" : 0339 }340 unique_test_ids = self.get_all_unique_test_ids(test_result_ext)341 targets = sorted(test_result_ext.keys())342 for target in targets:343 toolchains = sorted(test_result_ext[target].keys())344 for toolchain in toolchains:345 test_cases = []346 tests = sorted(test_result_ext[target][toolchain].keys())347 for test in tests:348 test_results = test_result_ext[target][toolchain][test]349 for test_res in test_results:350 test_ids = sorted(test_res.keys())351 for test_no in test_ids:352 test_result = test_res[test_no]353 result_dict[test_result['result']] += 1354 pt.add_row([test_result['result'],355 test_result['target_name'],356 test_result['toolchain_name'],357 test_result['id'],358 test_result['description'],359 test_result['elapsed_time'],360 test_result['duration']])361 result = pt.get_string()362 result += "\n"363 # Print result count364 result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.iteritems()])...

Full Screen

Full Screen

default.py

Source:default.py Github

copy

Full Screen

1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>2#3# This file is part of Ansible4#5# Ansible is free software: you can redistribute it and/or modify6# it under the terms of the GNU General Public License as published by7# the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# Ansible is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with Ansible. If not, see <http://www.gnu.org/licenses/>.17# Make coding more python3-ish18from __future__ import (absolute_import, division, print_function)19__metaclass__ = type20from ansible import constants as C21from ansible.plugins.callback import CallbackBase22from ansible.utils.color import colorize, hostcolor23class CallbackModule(CallbackBase):24 '''25 This is the default callback interface, which simply prints messages26 to stdout when new callback events are received.27 '''28 CALLBACK_VERSION = 2.029 CALLBACK_TYPE = 'stdout'30 CALLBACK_NAME = 'default'31 def __init__(self):32 self._play = None33 self._last_task_banner = None34 super(CallbackModule, self).__init__()35 def v2_runner_on_failed(self, result, ignore_errors=False):36 if self._play.strategy == 'free' and self._last_task_banner != result._task._uuid:37 self._print_task_banner(result._task)38 delegated_vars = result._result.get('_ansible_delegated_vars', None)39 if 'exception' in result._result:40 if self._display.verbosity < 3:41 # extract just the actual error message from the exception text42 error = result._result['exception'].strip().split('\n')[-1]43 msg = "An exception occurred during task execution. To see the full traceback, use -vvv. The error was: %s" % error44 else:45 msg = "An exception occurred during task execution. The full traceback is:\n" + result._result['exception']46 self._display.display(msg, color=C.COLOR_ERROR)47 self._handle_warnings(result._result)48 if result._task.loop and 'results' in result._result:49 self._process_items(result)50 else:51 if delegated_vars:52 self._display.display("fatal: [%s -> %s]: FAILED! => %s" % (result._host.get_name(), delegated_vars['ansible_host'], self._dump_results(result._result)), color=C.COLOR_ERROR)53 else:54 self._display.display("fatal: [%s]: FAILED! => %s" % (result._host.get_name(), self._dump_results(result._result)), color=C.COLOR_ERROR)55 if ignore_errors:56 self._display.display("...ignoring", color=C.COLOR_SKIP)57 def v2_runner_on_ok(self, result):58 if self._play.strategy == 'free' and self._last_task_banner != result._task._uuid:59 self._print_task_banner(result._task)60 self._clean_results(result._result, result._task.action)61 delegated_vars = result._result.get('_ansible_delegated_vars', None)62 self._clean_results(result._result, result._task.action)63 if result._task.action in ('include', 'include_role'):64 return65 elif result._result.get('changed', False):66 if delegated_vars:67 msg = "changed: [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])68 else:69 msg = "changed: [%s]" % result._host.get_name()70 color = C.COLOR_CHANGED71 else:72 if delegated_vars:73 msg = "ok: [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])74 else:75 msg = "ok: [%s]" % result._host.get_name()76 color = C.COLOR_OK77 self._handle_warnings(result._result)78 if result._task.loop and 'results' in result._result:79 self._process_items(result)80 else:81 if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:82 msg += " => %s" % (self._dump_results(result._result),)83 self._display.display(msg, color=color)84 def v2_runner_on_skipped(self, result):85 if C.DISPLAY_SKIPPED_HOSTS:86 if self._play.strategy == 'free' and self._last_task_banner != result._task._uuid:87 self._print_task_banner(result._task)88 if result._task.loop and 'results' in result._result:89 self._process_items(result)90 else:91 msg = "skipping: [%s]" % result._host.get_name()92 if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:93 msg += " => %s" % self._dump_results(result._result)94 self._display.display(msg, color=C.COLOR_SKIP)95 def v2_runner_on_unreachable(self, result):96 if self._play.strategy == 'free' and self._last_task_banner != result._task._uuid:97 self._print_task_banner(result._task)98 delegated_vars = result._result.get('_ansible_delegated_vars', None)99 if delegated_vars:100 self._display.display("fatal: [%s -> %s]: UNREACHABLE! => %s" % (result._host.get_name(), delegated_vars['ansible_host'], self._dump_results(result._result)), color=C.COLOR_UNREACHABLE)101 else:102 self._display.display("fatal: [%s]: UNREACHABLE! => %s" % (result._host.get_name(), self._dump_results(result._result)), color=C.COLOR_UNREACHABLE)103 def v2_playbook_on_no_hosts_matched(self):104 self._display.display("skipping: no hosts matched", color=C.COLOR_SKIP)105 def v2_playbook_on_no_hosts_remaining(self):106 self._display.banner("NO MORE HOSTS LEFT")107 def v2_playbook_on_task_start(self, task, is_conditional):108 if self._play.strategy != 'free':109 self._print_task_banner(task)110 def _print_task_banner(self, task):111 # args can be specified as no_log in several places: in the task or in112 # the argument spec. We can check whether the task is no_log but the113 # argument spec can't be because that is only run on the target114 # machine and we haven't run it thereyet at this time.115 #116 # So we give people a config option to affect display of the args so117 # that they can secure this if they feel that their stdout is insecure118 # (shoulder surfing, logging stdout straight to a file, etc).119 args = ''120 if not task.no_log and C.DISPLAY_ARGS_TO_STDOUT:121 args = u', '.join(u'%s=%s' % a for a in task.args.items())122 args = u' %s' % args123 self._display.banner(u"TASK [%s%s]" % (task.get_name().strip(), args))124 if self._display.verbosity >= 2:125 path = task.get_path()126 if path:127 self._display.display(u"task path: %s" % path, color=C.COLOR_DEBUG)128 self._last_task_banner = task._uuid129 def v2_playbook_on_cleanup_task_start(self, task):130 self._display.banner("CLEANUP TASK [%s]" % task.get_name().strip())131 def v2_playbook_on_handler_task_start(self, task):132 self._display.banner("RUNNING HANDLER [%s]" % task.get_name().strip())133 def v2_playbook_on_play_start(self, play):134 name = play.get_name().strip()135 if not name:136 msg = u"PLAY"137 else:138 msg = u"PLAY [%s]" % name139 self._play = play140 self._display.banner(msg)141 def v2_on_file_diff(self, result):142 if result._task.loop and 'results' in result._result:143 for res in result._result['results']:144 if 'diff' in res and res['diff'] and res.get('changed', False):145 diff = self._get_diff(res['diff'])146 if diff:147 self._display.display(diff)148 elif 'diff' in result._result and result._result['diff'] and result._result.get('changed', False):149 diff = self._get_diff(result._result['diff'])150 if diff:151 self._display.display(diff)152 def v2_runner_item_on_ok(self, result):153 delegated_vars = result._result.get('_ansible_delegated_vars', None)154 if result._task.action in ('include', 'include_role'):155 return156 elif result._result.get('changed', False):157 msg = 'changed'158 color = C.COLOR_CHANGED159 else:160 msg = 'ok'161 color = C.COLOR_OK162 if delegated_vars:163 msg += ": [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])164 else:165 msg += ": [%s]" % result._host.get_name()166 msg += " => (item=%s)" % (self._get_item(result._result),)167 if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:168 msg += " => %s" % self._dump_results(result._result)169 self._display.display(msg, color=color)170 def v2_runner_item_on_failed(self, result):171 delegated_vars = result._result.get('_ansible_delegated_vars', None)172 if 'exception' in result._result:173 if self._display.verbosity < 3:174 # extract just the actual error message from the exception text175 error = result._result['exception'].strip().split('\n')[-1]176 msg = "An exception occurred during task execution. To see the full traceback, use -vvv. The error was: %s" % error177 else:178 msg = "An exception occurred during task execution. The full traceback is:\n" + result._result['exception']179 self._display.display(msg, color=C.COLOR_ERROR)180 msg = "failed: "181 if delegated_vars:182 msg += "[%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])183 else:184 msg += "[%s]" % (result._host.get_name())185 self._handle_warnings(result._result)186 self._display.display(msg + " (item=%s) => %s" % (self._get_item(result._result), self._dump_results(result._result)), color=C.COLOR_ERROR)187 def v2_runner_item_on_skipped(self, result):188 if C.DISPLAY_SKIPPED_HOSTS:189 msg = "skipping: [%s] => (item=%s) " % (result._host.get_name(), self._get_item(result._result))190 if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:191 msg += " => %s" % self._dump_results(result._result)192 self._display.display(msg, color=C.COLOR_SKIP)193 def v2_playbook_on_include(self, included_file):194 msg = 'included: %s for %s' % (included_file._filename, ", ".join([h.name for h in included_file._hosts]))195 self._display.display(msg, color=C.COLOR_SKIP)196 def v2_playbook_on_stats(self, stats):197 self._display.banner("PLAY RECAP")198 hosts = sorted(stats.processed.keys())199 for h in hosts:200 t = stats.summarize(h)201 self._display.display(u"%s : %s %s %s %s" % (202 hostcolor(h, t),203 colorize(u'ok', t['ok'], C.COLOR_OK),204 colorize(u'changed', t['changed'], C.COLOR_CHANGED),205 colorize(u'unreachable', t['unreachable'], C.COLOR_UNREACHABLE),206 colorize(u'failed', t['failures'], C.COLOR_ERROR)),207 screen_only=True208 )209 self._display.display(u"%s : %s %s %s %s" % (210 hostcolor(h, t, False),211 colorize(u'ok', t['ok'], None),212 colorize(u'changed', t['changed'], None),213 colorize(u'unreachable', t['unreachable'], None),214 colorize(u'failed', t['failures'], None)),215 log_only=True216 )217 self._display.display("", screen_only=True)218 # print custom stats219 if C.SHOW_CUSTOM_STATS and stats.custom:220 self._display.banner("CUSTOM STATS: ")221 # per host222 #TODO: come up with 'pretty format'223 for k in sorted(stats.custom.keys()):224 if k == '_run':225 continue226 self._display.display('\t%s: %s' % (k, self._dump_results(stats.custom[k], indent=1).replace('\n','')))227 # print per run custom stats228 if '_run' in stats.custom:229 self._display.display("", screen_only=True)230 self._display.display('\tRUN: %s' % self._dump_results(stats.custom['_run'], indent=1).replace('\n',''))231 self._display.display("", screen_only=True)232 def v2_playbook_on_start(self, playbook):233 if self._display.verbosity > 1:234 from os.path import basename235 self._display.banner("PLAYBOOK: %s" % basename(playbook._file_name))236 if self._display.verbosity > 3:237 if self._options is not None:238 for option in dir(self._options):239 if option.startswith('_') or option in ['read_file', 'ensure_value', 'read_module']:240 continue241 val = getattr(self._options,option)242 if val:243 self._display.vvvv('%s: %s' % (option,val))244 def v2_runner_retry(self, result):245 msg = "FAILED - RETRYING: %s (%d retries left)." % (result._task, result._result['retries'] - result._result['attempts'])246 if (self._display.verbosity > 2 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:247 msg += "Result was: %s" % self._dump_results(result._result)...

Full Screen

Full Screen

test_deferred.py

Source:test_deferred.py Github

copy

Full Screen

1from twisted.internet import defer2from twisted.trial import unittest3from twisted.trial import runner, reporter, util4from twisted.trial.test import detests5class TestSetUp(unittest.TestCase):6 def _loadSuite(self, klass):7 loader = runner.TestLoader()8 r = reporter.TestResult()9 s = loader.loadClass(klass)10 return r, s11 def test_success(self):12 result, suite = self._loadSuite(detests.DeferredSetUpOK)13 suite(result)14 self.failUnless(result.wasSuccessful())15 self.failUnlessEqual(result.testsRun, 1)16 def test_fail(self):17 self.failIf(detests.DeferredSetUpFail.testCalled)18 result, suite = self._loadSuite(detests.DeferredSetUpFail)19 suite(result)20 self.failIf(result.wasSuccessful())21 self.failUnlessEqual(result.testsRun, 1)22 self.failUnlessEqual(len(result.failures), 0)23 self.failUnlessEqual(len(result.errors), 1)24 self.failIf(detests.DeferredSetUpFail.testCalled)25 def test_callbackFail(self):26 self.failIf(detests.DeferredSetUpCallbackFail.testCalled)27 result, suite = self._loadSuite(detests.DeferredSetUpCallbackFail)28 suite(result)29 self.failIf(result.wasSuccessful())30 self.failUnlessEqual(result.testsRun, 1)31 self.failUnlessEqual(len(result.failures), 0)32 self.failUnlessEqual(len(result.errors), 1)33 self.failIf(detests.DeferredSetUpCallbackFail.testCalled)34 def test_error(self):35 self.failIf(detests.DeferredSetUpError.testCalled)36 result, suite = self._loadSuite(detests.DeferredSetUpError)37 suite(result)38 self.failIf(result.wasSuccessful())39 self.failUnlessEqual(result.testsRun, 1)40 self.failUnlessEqual(len(result.failures), 0)41 self.failUnlessEqual(len(result.errors), 1)42 self.failIf(detests.DeferredSetUpError.testCalled)43 def test_skip(self):44 self.failIf(detests.DeferredSetUpSkip.testCalled)45 result, suite = self._loadSuite(detests.DeferredSetUpSkip)46 suite(result)47 self.failUnless(result.wasSuccessful())48 self.failUnlessEqual(result.testsRun, 1)49 self.failUnlessEqual(len(result.failures), 0)50 self.failUnlessEqual(len(result.errors), 0)51 self.failUnlessEqual(len(result.skips), 1)52 self.failIf(detests.DeferredSetUpSkip.testCalled)53class TestNeverFire(unittest.TestCase):54 def setUp(self):55 self._oldTimeout = util.DEFAULT_TIMEOUT_DURATION56 util.DEFAULT_TIMEOUT_DURATION = 0.157 def tearDown(self):58 util.DEFAULT_TIMEOUT_DURATION = self._oldTimeout59 def _loadSuite(self, klass):60 loader = runner.TestLoader()61 r = reporter.TestResult()62 s = loader.loadClass(klass)63 return r, s64 def test_setUp(self):65 self.failIf(detests.DeferredSetUpNeverFire.testCalled)66 result, suite = self._loadSuite(detests.DeferredSetUpNeverFire)67 suite(result)68 self.failIf(result.wasSuccessful())69 self.failUnlessEqual(result.testsRun, 1)70 self.failUnlessEqual(len(result.failures), 0)71 self.failUnlessEqual(len(result.errors), 1)72 self.failIf(detests.DeferredSetUpNeverFire.testCalled)73 self.failUnless(result.errors[0][1].check(defer.TimeoutError))74class TestTester(unittest.TestCase):75 def getTest(self, name):76 raise NotImplementedError("must override me")77 def runTest(self, name):78 result = reporter.TestResult()79 self.getTest(name).run(result)80 return result81class TestDeferred(TestTester):82 def getTest(self, name):83 return detests.DeferredTests(name)84 def test_pass(self):85 result = self.runTest('test_pass')86 self.failUnless(result.wasSuccessful())87 self.failUnlessEqual(result.testsRun, 1)88 def test_passGenerated(self):89 result = self.runTest('test_passGenerated')90 self.failUnless(result.wasSuccessful())91 self.failUnlessEqual(result.testsRun, 1)92 self.failUnless(detests.DeferredTests.touched)93 def test_fail(self):94 result = self.runTest('test_fail')95 self.failIf(result.wasSuccessful())96 self.failUnlessEqual(result.testsRun, 1)97 self.failUnlessEqual(len(result.failures), 1)98 def test_failureInCallback(self):99 result = self.runTest('test_failureInCallback')100 self.failIf(result.wasSuccessful())101 self.failUnlessEqual(result.testsRun, 1)102 self.failUnlessEqual(len(result.failures), 1)103 def test_errorInCallback(self):104 result = self.runTest('test_errorInCallback')105 self.failIf(result.wasSuccessful())106 self.failUnlessEqual(result.testsRun, 1)107 self.failUnlessEqual(len(result.errors), 1)108 def test_skip(self):109 result = self.runTest('test_skip')110 self.failUnless(result.wasSuccessful())111 self.failUnlessEqual(result.testsRun, 1)112 self.failUnlessEqual(len(result.skips), 1)113 self.failIf(detests.DeferredTests.touched)114 def test_todo(self):115 result = self.runTest('test_expectedFailure')116 self.failUnless(result.wasSuccessful())117 self.failUnlessEqual(result.testsRun, 1)118 self.failUnlessEqual(len(result.errors), 0)119 self.failUnlessEqual(len(result.failures), 0)120 self.failUnlessEqual(len(result.expectedFailures), 1)121 def test_thread(self):122 result = self.runTest('test_thread')123 self.failUnlessEqual(result.testsRun, 1)124 self.failUnless(result.wasSuccessful(), result.errors)125class TestTimeout(TestTester):126 def getTest(self, name):127 return detests.TimeoutTests(name)128 def _wasTimeout(self, error):129 self.failUnlessEqual(error.check(defer.TimeoutError),130 defer.TimeoutError)131 def test_pass(self):132 result = self.runTest('test_pass')133 self.failUnless(result.wasSuccessful())134 self.failUnlessEqual(result.testsRun, 1)135 def test_passDefault(self):136 result = self.runTest('test_passDefault')137 self.failUnless(result.wasSuccessful())138 self.failUnlessEqual(result.testsRun, 1)139 def test_timeout(self):140 result = self.runTest('test_timeout')141 self.failIf(result.wasSuccessful())142 self.failUnlessEqual(result.testsRun, 1)143 self.failUnlessEqual(len(result.errors), 1)144 self._wasTimeout(result.errors[0][1])145 def test_timeoutZero(self):146 result = self.runTest('test_timeoutZero')147 self.failIf(result.wasSuccessful())148 self.failUnlessEqual(result.testsRun, 1)149 self.failUnlessEqual(len(result.errors), 1)150 self._wasTimeout(result.errors[0][1])151 def test_skip(self):152 result = self.runTest('test_skip')153 self.failUnless(result.wasSuccessful())154 self.failUnlessEqual(result.testsRun, 1)155 self.failUnlessEqual(len(result.skips), 1)156 def test_todo(self):157 result = self.runTest('test_expectedFailure')158 self.failUnless(result.wasSuccessful())159 self.failUnlessEqual(result.testsRun, 1)160 self.failUnlessEqual(len(result.expectedFailures), 1)161 self._wasTimeout(result.expectedFailures[0][1])162 def test_errorPropagation(self):163 result = self.runTest('test_errorPropagation')164 self.failIf(result.wasSuccessful())165 self.failUnlessEqual(result.testsRun, 1)166 self._wasTimeout(detests.TimeoutTests.timedOut)167 def test_classTimeout(self):168 loader = runner.TestLoader()169 suite = loader.loadClass(detests.TestClassTimeoutAttribute)170 result = reporter.TestResult()171 suite.run(result)172 self.failUnlessEqual(len(result.errors), 1)173 self._wasTimeout(result.errors[0][1])174 def test_callbackReturnsNonCallingDeferred(self):175 #hacky timeout176 # raises KeyboardInterrupt because Trial sucks177 from twisted.internet import reactor178 call = reactor.callLater(2, reactor.crash)179 result = self.runTest('test_calledButNeverCallback')180 if call.active():181 call.cancel()182 self.failIf(result.wasSuccessful())...

Full Screen

Full Screen

test.py

Source:test.py Github

copy

Full Screen

1#!/usr/bin/python32import unittest3from different_summands import optimal_summands4class OptimalSummandsProductTestCase(unittest.TestCase):5 @unittest.expectedFailure6 def test_with_preceeding_lower_bound_as_negative(self):7 optimal_summands(-1)8 @unittest.expectedFailure9 def test_with_preceeding_lower_bound_as_zero(self):10 optimal_summands(0)11 def test_with_1(self):12 n = 113 result = optimal_summands(n)14 self.assertEqual(1, len(result))15 self.assertEqual(1, result[0])16 self.assertEqual(n, sum(result))17 def test_with_2(self):18 n = 219 result = optimal_summands(n)20 self.assertEqual(1, len(result))21 self.assertEqual(2, result[0])22 self.assertEqual(n, sum(result))23 def test_with_3(self):24 n = 325 result = optimal_summands(n)26 self.assertEqual(2, len(result))27 self.assertEqual(1, result[0])28 self.assertEqual(2, result[1])29 self.assertEqual(n, sum(result))30 def test_with_4(self):31 n = 432 result = optimal_summands(n)33 self.assertEqual(2, len(result))34 self.assertEqual(1, result[0])35 self.assertEqual(3, result[1])36 self.assertEqual(n, sum(result))37 def test_with_5(self):38 n = 539 result = optimal_summands(n)40 self.assertEqual(2, len(result))41 self.assertEqual(1, result[0])42 self.assertEqual(4, result[1])43 self.assertEqual(n, sum(result))44 def test_with_6(self):45 n = 646 result = optimal_summands(n)47 self.assertEqual(3, len(result))48 self.assertEqual(1, result[0])49 self.assertEqual(2, result[1])50 self.assertEqual(3, result[2])51 self.assertEqual(n, sum(result))52 def test_with_7(self):53 n = 754 result = optimal_summands(n)55 self.assertEqual(3, len(result))56 self.assertEqual(1, result[0])57 self.assertEqual(2, result[1])58 self.assertEqual(4, result[2])59 self.assertEqual(n, sum(result))60 def test_with_8(self):61 n = 862 result = optimal_summands(n)63 self.assertEqual(3, len(result))64 self.assertEqual(1, result[0])65 self.assertEqual(2, result[1])66 self.assertEqual(5, result[2])67 self.assertEqual(n, sum(result))68 def test_with_9(self):69 n = 970 result = optimal_summands(n)71 self.assertEqual(3, len(result))72 self.assertEqual(1, result[0])73 self.assertEqual(2, result[1])74 self.assertEqual(6, result[2])75 self.assertEqual(n, sum(result))76 def test_with_10(self):77 n = 1078 result = optimal_summands(n)79 self.assertEqual(4, len(result))80 self.assertEqual(1, result[0])81 self.assertEqual(2, result[1])82 self.assertEqual(3, result[2])83 self.assertEqual(4, result[3])84 self.assertEqual(n, sum(result))85 def test_with_1000(self):86 n = 100087 result = optimal_summands(n)88 self.assertEqual(44, len(result))89 self.assertEqual(n, sum(result))90 def test_with_1000000(self):91 n = 100000092 result = optimal_summands(n)93 self.assertEqual(1413, len(result))94 self.assertEqual(n, sum(result))95 def test_with_1000000000(self):96 n = 100000000097 result = optimal_summands(n)98 self.assertEqual(44720, len(result))99 self.assertEqual(n, sum(result))100if __name__ == '__main__':...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var result = require('devicefarmer-stf-client').result;2result('pass');3var result = require('devicefarmer-stf-client').result;4result('fail');5var result = require('devicefarmer-stf-client').result;6result('pass', 'Test case passed');7var result = require('devicefarmer-stf-client').result;8result('fail', 'Test case failed');9var result = require('devicefarmer-stf-client').result;10result('pass', 'Test case passed', 'Test case passed');11var result = require('devicefarmer-stf-client').result;12result('fail', 'Test case failed', 'Test case failed');13var result = require('devicefarmer-stf-client').result;14result('pass', 'Test case passed', 'Test case passed', 'Test case passed');15var result = require('devicefarmer-stf-client').result;16result('fail', 'Test case failed', 'Test case failed', 'Test case failed');17var result = require('devicefarmer-stf-client').result;18result('pass', 'Test case passed', 'Test case passed', 'Test case passed', 'Test case passed');19var result = require('devicefarmer-stf-client').result;20result('fail', 'Test case failed', 'Test case failed', 'Test case failed', 'Test case failed');21var result = require('devicefarmer-stf-client').result;22result('pass', 'Test case passed', 'Test case passed', 'Test case passed', 'Test case passed', 'Test case passed');23var result = require('devicefarmer-stf-client').result;24result('fail', 'Test

Full Screen

Using AI Code Generation

copy

Full Screen

1var stf = require('devicefarmer-stf-client');2client.getDevices().then(function (devices) {3 console.log('Found ' + devices.length + ' devices');4 devices.forEach(function (device) {5 console.log('Found device ' + device.serial);6 device.reserve().then(function () {7 return device.connect();8 }).then(function () {9 return device.use();10 }).then(function () {11 return device.result();12 }).then(function () {13 return device.release();14 }).then(function () {15 console.log('Released device ' + device.serial);16 }).catch(function (err) {17 console.error('Failed to use device ' + device.serial, err);18 });19 });20}).catch(function (err) {21 console.error('Failed to get devices', err);22});23{24 "dependencies": {25 },26 "devDependencies": {},27 "scripts": {28 },29}30{31 "dependencies": {32 "devicefarmer-stf-client": {33 "requires": {

Full Screen

Using AI Code Generation

copy

Full Screen

1var stf = require('devicefarmer-stf');2client.getDevices().then(function (devices) {3 var device = devices[0];4 device.result('pass');5});6var stf = require('devicefarmer-stf');7var app = stf.app();8var server = app.listen(7100, function () {9 console.log('Listening on port %d', server.address().port);10});11{12 "dependencies": {13 },14 "devDependencies": {},15 "scripts": {16 },17}

Full Screen

Using AI Code Generation

copy

Full Screen

1var devicefarmer = require('devicefarmer-stf');2stf.result(1, function(err, result) {3 console.log(result);4});5var devicefarmer = require('devicefarmer-stf');6stf.result(1, function(err, result) {7 console.log(result);8});9var devicefarmer = require('devicefarmer-stf');10stf.result(1, function(err, result) {11 console.log(result);12});13var devicefarmer = require('devicefarmer-stf');14stf.result(1, function(err, result) {15 console.log(result);16});17var devicefarmer = require('devicefarmer-stf');18stf.result(1, function(err, result) {19 console.log(result);20});21var devicefarmer = require('devicefarmer-stf');22stf.result(1, function(err, result) {23 console.log(result);24});25var devicefarmer = require('devicefarmer-stf');26stf.result(1, function(err, result) {27 console.log(result);28});29var devicefarmer = require('devicefarmer-stf');30stf.result(1, function(err, result) {31 console.log(result);32});

Full Screen

Using AI Code Generation

copy

Full Screen

1var result = require('devicefarmer-stf-client').result;2var test = result.test;3var result = result.result;4var stf = require('devicefarmer-stf-client');5var stf = require('devicefarmer-stf-client');6var stf = require('devicefarmer-stf-client');7var stf = require('devicefarmer-stf-client');8var stf = require('devicefarmer-stf-client');9var stf = require('devicefarmer-stf-client');10var stf = require('devicefarmer-stf-client');11var stf = require('devicefarmer-stf-client');

Full Screen

Using AI Code Generation

copy

Full Screen

1var client = require('devicefarmer-stf-client').client;2var device = client.getDeviceBySerial('serialnumber');3device.result('pass', 'Test Passed', function(err, data) {4 if (err) {5 console.log(err);6 } else {7 console.log(data);8 }9});10var client = require('devicefarmer-stf-client').client;11var device = client.getDeviceBySerial('serialnumber');12device.result('fail', 'Test Failed', function(err, data) {13 if (err) {14 console.log(err);15 } else {16 console.log(data);17 }18});19var client = require('devicefarmer-stf-client').client;20var device = client.getDeviceBySerial('serialnumber');21 if (err) {22 console.log(err);23 } else {24 console.log(data);25 }26});27var client = require('devicefarmer-stf-client').client;28var device = client.getDeviceBySerial('serialnumber');29 if (err) {30 console.log(err);31 } else {32 console.log(data);33 }34});35var client = require('devicefarmer-stf-client').client;36var device = client.getDeviceBySerial('serialnumber');37 if (err) {38 console.log(err);39 } else {40 console.log(data);41 }42});43var client = require('devicefarmer-stf-client').client;44var device = client.getDeviceBySerial('serialnumber');45 if (err) {46 console.log(err);47 } else {

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run devicefarmer-stf automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful