How to use Catch method in argos

Best JavaScript code snippet using argos

02_threads.py

Source:02_threads.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# Как создать и запустить поток3import random4import time5from collections import defaultdict6from threading import Thread7FISH = (None, 'плотва', 'окунь', 'лещ')8# Определим функцию, эмулирующую рыбалку9def fishing(name, worms):10 catch = defaultdict(int)11 for worm in range(worms):12 print(f'{name}: Червяк № {worm} - Забросил, ждем...', flush=True)13 _ = 3 ** (random.randint(50, 70) * 10000)14 fish = random.choice(FISH)15 if fish is None:16 print(f'{name}: Тьфу, сожрали червяка...', flush=True)17 else:18 print(f'{name}: Ага, у меня {fish}', flush=True)19 catch[fish] += 120 print(f'Итого рыбак {name} поймал:')21 for fish, count in catch.items():22 print(f' {fish} - {count}')23fishing(name='Вася', worms=10)24# А теперь создадим второго рыбака, пошедшего на рыбалку ОДНОВРЕМЕННО с первым25thread = Thread(target=fishing, kwargs=dict(name='Вася', worms=10))26thread.start()27fishing(name='Коля', worms=10)28thread.join()29# При простой передаче функции в поток результат выполнения функции30# можно получить только через изменяемый обьект в параметрах:31def fishing(name, worms, catch):32 for worm in range(worms):33 print(f'{name}: Червяк № {worm} - Забросил, ждем...', flush=True)34 _ = 3 ** (random.randint(50, 70) * 10000)35 fish = random.choice(FISH)36 if fish is None:37 print(f'{name}: Тьфу, сожрали червяка...', flush=True)38 else:39 print(f'{name}: Ага, у меня {fish}', flush=True)40 catch[fish] += 141vasya_catch = defaultdict(int)42thread = Thread(target=fishing, kwargs=dict(name='Вася', worms=10, catch=vasya_catch))43thread.start()44kolya_catch = defaultdict(int)45fishing(name='Коля', worms=10, catch=kolya_catch)46thread.join()47for name, catch in (('Вася', vasya_catch), ('Вася', kolya_catch)):48 print(f'Итого рыбак {name} поймал:')49 for fish, count in catch.items():50 print(f' {fish} - {count}')51# Более удобно использовать наследование от класса потока52class Fisher(Thread):53 def __init__(self, name, worms, *args, **kwargs):54 super(Fisher, self).__init__(*args, **kwargs)55 self.name = name56 self.worms = worms57 def run(self):58 catch = defaultdict(int)59 for worm in range(self.worms):60 print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)61 _ = 3 ** (random.randint(50, 70) * 10000)62 fish = random.choice(FISH)63 if fish is None:64 print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)65 else:66 print(f'{self.name}: Ага, у меня {fish}', flush=True)67 catch[fish] += 168 print(f'Итого рыбак {self.name} поймал:')69 for fish, count in catch.items():70 print(f' {fish} - {count}')71vasya = Fisher(name='Вася', worms=10)72kolya = Fisher(name='Коля', worms=10)73print('.' * 20, 'Они пошли на рыбалку')74vasya.start()75kolya.start()76print('.' * 20, 'Ждем пока они вернутся...')77vasya.join()78kolya.join()79print('.' * 20, 'Итак, они вернулись')80# Если нужен результат выполнения, то просто делаем атрибут класса81class Fisher(Thread):82 def __init__(self, name, worms, *args, **kwargs):83 super().__init__(*args, **kwargs)84 self.name = name85 self.worms = worms86 self.catch = defaultdict(int)87 def run(self):88 self.catch = defaultdict(int)89 for worm in range(self.worms):90 print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)91 _ = 3 ** (random.randint(50, 70) * 10000)92 fish = random.choice(FISH)93 if fish is None:94 print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)95 else:96 print(f'{self.name}: Ага, у меня {fish}', flush=True)97 self.catch[fish] += 198vasya = Fisher(name='Вася', worms=10)99kolya = Fisher(name='Коля', worms=10)100print('.' * 20, 'Они пошли на рыбалку')101vasya.start()102kolya.start()103print('.' * 20, 'Ждем пока они вернутся...')104vasya.join()105kolya.join()106print('.' * 20, 'Итак, они вернулись')107for fisher in (vasya, kolya):108 print(f'Итого рыбак {fisher.name} поймал:')109 for fish, count in fisher.catch.items():110 print(f' {fish} - {count}')111# Что будет если в потоке ошибка112class Fisher(Thread):113 def __init__(self, name, worms, *args, **kwargs):114 super().__init__(*args, **kwargs)115 self.name = name116 self.worms = worms117 self.catch = defaultdict(int)118 def run(self):119 self.catch = defaultdict(int)120 for worm in range(self.worms):121 print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)122 _ = 3 ** (random.randint(50, 70) * 10000)123 dice = random.randint(1, 5)124 if self.name == 'Коля' and dice == 1:125 raise ValueError(f'{self.name}: Блин, у меня сломалась удочка на {worm} червяке :(')126 fish = random.choice(FISH)127 if fish is None:128 print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)129 else:130 print(f'{self.name}: Ага, у меня {fish}', flush=True)131 self.catch[fish] += 1132vasya = Fisher(name='Вася', worms=10)133kolya = Fisher(name='Коля', worms=10)134print('.' * 20, 'Они пошли на рыбалку')135vasya.start()136kolya.start()137print('.' * 20, 'Ждем пока они вернутся...')138vasya.join()139kolya.join()140print('.' * 20, 'Итак, они вернулись')141for fisher in (vasya, kolya):142 print(f'Итого рыбак {fisher.name} поймал:')143 for fish, count in fisher.catch.items():144 print(f' {fish} - {count}')145# Обрабатывать ошибки нужно в самом потоке146class Fisher(Thread):147 def __init__(self, name, worms, *args, **kwargs):148 super().__init__(*args, **kwargs)149 self.name = name150 self.worms = worms151 self.catch = defaultdict(int)152 def run(self):153 try:154 self._fishing()155 except Exception as exc:156 print(exc)157 def _fishing(self):158 self.catch = defaultdict(int)159 for worm in range(self.worms):160 print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)161 _ = 3 ** (random.randint(50, 70) * 10000)162 dice = random.randint(1, 5)163 if self.name == 'Коля' and dice == 1:164 raise ValueError(f'{self.name}: Блин, у меня сломалась удочка на {worm} червяке :(')165 fish = random.choice(FISH)166 if fish is None:167 print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)168 else:169 print(f'{self.name}: Ага, у меня {fish}', flush=True)170 self.catch[fish] += 1171vasya = Fisher(name='Вася', worms=10)172kolya = Fisher(name='Коля', worms=10)173print('.' * 20, 'Они пошли на рыбалку')174vasya.start()175kolya.start()176print('.' * 20, 'Ждем пока они вернутся...')177vasya.join()178kolya.join()179print('.' * 20, 'Итак, они вернулись')180for fisher in (vasya, kolya):181 print(f'Итого рыбак {fisher.name} поймал:')182 for fish, count in fisher.catch.items():183 print(f' {fish} - {count}')184# Вроде все прекрасно, но в пайтоне есть суровый GIL - Global Interpreter Lock - https://goo.gl/MTokAe185# GIL велик и ужасен - это механизм блокировки выполнения потоков, пока один выполняется.186# Что, почему, зачем, Карл?! Зачем стрелять себе в ногу?187#188# Все дело в том, что сам пайтон - процесс в операционной системе. И ОС может приостанавливать выполнение189# самого процесса пайтона. А когда происходит двойное засыпание/просыпание,190# то возникает проблема доступа к одним и тем участкам памяти. Поэтому разработчики CPython,191# (а у нас именно эта реализация) решили сделать GIL.192#193# Поэтому нельзя получить выгоду в производительности программы в т.н. CPU-bounded алгоритмах194# (это те, которым требуется процессорное время и не нужны операции ввода/вывода)195class Fisher(Thread):196 def __init__(self, name, worms, *args, **kwargs):197 super().__init__(*args, **kwargs)198 self.name = name199 self.worms = worms200 self.catch = defaultdict(int)201 def run(self):202 self.catch = defaultdict(int)203 for worm in range(self.worms):204 _ = worm ** 10000 # фиксируем время ожидания поклевки205 fish = random.choice(FISH)206 if fish is not None:207 self.catch[fish] += 1208def time_track(func):209 def surrogate(*args, **kwargs):210 started_at = time.time()211 result = func(*args, **kwargs)212 ended_at = time.time()213 elapsed = round(ended_at - started_at, 6)214 print(f'Функция {func.__name__} работала {elapsed} секунд(ы)',)215 return result216 return surrogate217@time_track218def run_in_one_thread(fishers):219 for fisher in fishers:220 fisher.run()221@time_track222def run_in_threads(fishers):223 for fisher in fishers:224 fisher.start()225 for fisher in fishers:226 fisher.join()227humans = ['Васек', 'Колян', 'Петрович', 'Хмурый', 'Клава', ]228fishers = [Fisher(name=name, worms=100) for name in humans]229run_in_one_thread(fishers)230run_in_threads(fishers)231# Хорошая новость в том, что пайтон отпускает GIL перед системными вызовами. Чтение из файла, к примеру,232# может занимать длительное время и совершенно не требует GIL — можно дать шанс другим потокам поработать.233# ...234# Профит! там где есть операции ввода/ввывода: чтение с диска, обменд данными по сети, етс.235# Хорошая новость: любая программа должна обмениваться данными с внешним миром :)236class Fisher(Thread):237 def __init__(self, name, worms, *args, **kwargs):238 super().__init__(*args, **kwargs)239 self.name = name240 self.worms = worms241 self.catch = defaultdict(int)242 def run(self):243 self.catch = defaultdict(int)244 for worm in range(self.worms):245 time.sleep(0.01) # TODO тут вызов системной функции246 fish = random.choice(FISH)247 if fish is not None:248 self.catch[fish] += 1249def time_track(func):250 def surrogate(*args, **kwargs):251 started_at = time.time()252 result = func(*args, **kwargs)253 ended_at = time.time()254 elapsed = round(ended_at - started_at, 6)255 print(f'Функция {func.__name__} работала {elapsed} секунд(ы)',)256 return result257 return surrogate258@time_track259def run_in_one_thread(fishers):260 for fisher in fishers:261 fisher.run()262@time_track263def run_in_threads(fishers):264 for fisher in fishers:265 fisher.start()266 for fisher in fishers:267 fisher.join()268humans = ['Васек', 'Колян', 'Петрович', 'Хмурый', 'Клава', ]269fishers = [Fisher(name=name, worms=100) for name in humans]270run_in_one_thread(fishers)271run_in_threads(fishers)272# Про обьяснение механизма GIL очень рекомендую посмотреть видео с Moscow Python Meetup:273# Григорий Петров. "GIL в Python: зачем он нужен и как с этим жить"274# http://www.moscowpython.ru/meetup/14/gil-and-python-why/275###276# Извне завершить поток невозможно штатными средствами пайтона.277# И это правильно, вот две основные проблемы с "убийством" потока:278# - поток может активно работать с данными в этот момент и принудительное завершениие разрушит их целостность.279# - поток может породить другие потоки - их тоже завершать?280# Что делать? Можно добавить признак выхода:281class Fisher(Thread):282 def __init__(self, name, worms, *args, **kwargs):283 super().__init__(*args, **kwargs)284 self.name = name285 self.worms = worms286 self.catch = defaultdict(int)287 # будем проверять в цикле - а не пора ли нам заканчивать?288 self.need_stop = False289 def run(self):290 self.catch = defaultdict(int)291 for worm in range(self.worms):292 print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)293 _ = 3 ** (random.randint(50, 70) * 10000)294 fish = random.choice(FISH)295 if fish is None:296 print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)297 else:298 print(f'{self.name}: Ага, у меня {fish}', flush=True)299 self.catch[fish] += 1300 if self.need_stop:301 print(f'{self.name}: Ой, жена ужинать зовет! Сматываем удочки...', flush=True)302 break303vasya = Fisher(name='Вася', worms=100)304vasya.start()305time.sleep(1)306if vasya.is_alive(): # кстати с помощью этого метода можно проверить выполняется ли еще поток?307 vasya.need_stop = True308vasya.join() # ожидание завершения обязательно - поток может некоторое время финализировать работу309# Подводя итог, нужно сказать, что в мультипоточном программированиии наши линейный код310# перестают быть линейным (СИНХРОННЫМ). Если ранее мы были уверены в последовательности выполнения кода,311# то при использовании потоков (процессов) код может выполняться АСИНХРОННО: нельзя гарантировать312# что за этим блоком кода будет выполняться вот этот....

Full Screen

Full Screen

test_virt_drivers.py

Source:test_virt_drivers.py Github

copy

Full Screen

1# vim: tabstop=4 shiftwidth=4 softtabstop=42#3# Copyright 2010 X7 LLC4#5# Licensed under the Apache License, Version 2.0 (the "License"); you may6# not use this file except in compliance with the License. You may obtain7# a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16import base6417import netaddr18import sys19import traceback20from engine import exception21from engine import flags22from engine import image23from engine import log as logging24from engine import test25from engine.tests import utils as test_utils26libvirt = None27FLAGS = flags.FLAGS28LOG = logging.getLogger('engine.tests.test_virt_drivers')29def catch_notimplementederror(f):30 """Decorator to simplify catching drivers raising NotImplementedError31 If a particular call makes a driver raise NotImplementedError, we32 log it so that we can extract this information afterwards to33 automatically generate a hypervisor/feature support matrix."""34 def wrapped_func(self, *args, **kwargs):35 try:36 return f(self, *args, **kwargs)37 except NotImplementedError:38 frame = traceback.extract_tb(sys.exc_info()[2])[-1]39 LOG.error('%(driver)s does not implement %(method)s' % {40 'driver': type(self.connection),41 'method': frame[2]})42 wrapped_func.__name__ = f.__name__43 wrapped_func.__doc__ = f.__doc__44 return wrapped_func45class _VirtDriverTestCase(test.TestCase):46 def setUp(self):47 super(_VirtDriverTestCase, self).setUp()48 self.connection = self.driver_module.get_connection('')49 self.ctxt = test_utils.get_test_admin_context()50 self.image_service = image.get_default_image_service()51 def _get_running_instance(self):52 instance_ref = test_utils.get_test_instance()53 network_info = test_utils.get_test_network_info()54 image_info = test_utils.get_test_image_info(None, instance_ref)55 self.connection.spawn(self.ctxt, instance=instance_ref,56 image_meta=image_info,57 network_info=network_info)58 return instance_ref, network_info59 @catch_notimplementederror60 def test_init_host(self):61 self.connection.init_host('myhostname')62 @catch_notimplementederror63 def test_list_instances(self):64 self.connection.list_instances()65 @catch_notimplementederror66 def test_list_instances_detail(self):67 self.connection.list_instances_detail()68 @catch_notimplementederror69 def test_spawn(self):70 instance_ref, network_info = self._get_running_instance()71 domains = self.connection.list_instances()72 self.assertIn(instance_ref['name'], domains)73 domains_details = self.connection.list_instances_detail()74 self.assertIn(instance_ref['name'], [i.name for i in domains_details])75 @catch_notimplementederror76 def test_snapshot_not_running(self):77 instance_ref = test_utils.get_test_instance()78 img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})79 self.assertRaises(exception.InstanceNotRunning,80 self.connection.snapshot,81 self.ctxt, instance_ref, img_ref['id'])82 @catch_notimplementederror83 def test_snapshot_running(self):84 img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})85 instance_ref, network_info = self._get_running_instance()86 self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'])87 @catch_notimplementederror88 def test_reboot(self):89 reboot_type = "SOFT"90 instance_ref, network_info = self._get_running_instance()91 self.connection.reboot(instance_ref, network_info, reboot_type)92 @catch_notimplementederror93 def test_get_host_ip_addr(self):94 host_ip = self.connection.get_host_ip_addr()95 # Will raise an exception if it's not a valid IP at all96 ip = netaddr.IPAddress(host_ip)97 # For now, assume IPv4.98 self.assertEquals(ip.version, 4)99 @catch_notimplementederror100 def test_resize_running(self):101 instance_ref, network_info = self._get_running_instance()102 self.connection.resize(instance_ref, 7)103 @catch_notimplementederror104 def test_set_admin_password(self):105 instance_ref, network_info = self._get_running_instance()106 self.connection.set_admin_password(instance_ref, 'p4ssw0rd')107 @catch_notimplementederror108 def test_inject_file(self):109 instance_ref, network_info = self._get_running_instance()110 self.connection.inject_file(instance_ref,111 base64.b64encode('/testfile'),112 base64.b64encode('testcontents'))113 @catch_notimplementederror114 def test_agent_update(self):115 instance_ref, network_info = self._get_running_instance()116 self.connection.agent_update(instance_ref, 'http://www.x7.org/',117 'd41d8cd98f00b204e9800998ecf8427e')118 @catch_notimplementederror119 def test_rescue(self):120 instance_ref, network_info = self._get_running_instance()121 self.connection.rescue(self.ctxt, instance_ref, network_info, None)122 @catch_notimplementederror123 def test_unrescue_unrescued_instance(self):124 instance_ref, network_info = self._get_running_instance()125 self.connection.unrescue(instance_ref, network_info)126 @catch_notimplementederror127 def test_unrescue_rescued_instance(self):128 instance_ref, network_info = self._get_running_instance()129 self.connection.rescue(self.ctxt, instance_ref, network_info, None)130 self.connection.unrescue(instance_ref, network_info)131 @catch_notimplementederror132 def test_poll_rebooting_instances(self):133 self.connection.poll_rebooting_instances(10)134 @catch_notimplementederror135 def test_poll_rescued_instances(self):136 self.connection.poll_rescued_instances(10)137 @catch_notimplementederror138 def test_poll_unconfirmed_resizes(self):139 self.connection.poll_unconfirmed_resizes(10)140 @catch_notimplementederror141 def test_migrate_disk_and_power_off(self):142 instance_ref, network_info = self._get_running_instance()143 instance_type_ref = test_utils.get_test_instance_type()144 self.connection.migrate_disk_and_power_off(145 self.ctxt, instance_ref, 'dest_host', instance_type_ref)146 @catch_notimplementederror147 def test_pause(self):148 instance_ref, network_info = self._get_running_instance()149 self.connection.pause(instance_ref)150 @catch_notimplementederror151 def test_unpause_unpaused_instance(self):152 instance_ref, network_info = self._get_running_instance()153 self.connection.unpause(instance_ref)154 @catch_notimplementederror155 def test_unpause_paused_instance(self):156 instance_ref, network_info = self._get_running_instance()157 self.connection.pause(instance_ref)158 self.connection.unpause(instance_ref)159 @catch_notimplementederror160 def test_suspend(self):161 instance_ref, network_info = self._get_running_instance()162 self.connection.suspend(instance_ref)163 @catch_notimplementederror164 def test_resume_unsuspended_instance(self):165 instance_ref, network_info = self._get_running_instance()166 self.connection.resume(instance_ref)167 @catch_notimplementederror168 def test_resume_suspended_instance(self):169 instance_ref, network_info = self._get_running_instance()170 self.connection.suspend(instance_ref)171 self.connection.resume(instance_ref)172 @catch_notimplementederror173 def test_destroy_instance_nonexistant(self):174 fake_instance = {'id': 42, 'name': 'I just made this up!'}175 network_info = test_utils.get_test_network_info()176 self.connection.destroy(fake_instance, network_info)177 @catch_notimplementederror178 def test_destroy_instance(self):179 instance_ref, network_info = self._get_running_instance()180 self.assertIn(instance_ref['name'],181 self.connection.list_instances())182 self.connection.destroy(instance_ref, network_info)183 self.assertNotIn(instance_ref['name'],184 self.connection.list_instances())185 @catch_notimplementederror186 def test_attach_detach_volume(self):187 instance_ref, network_info = self._get_running_instance()188 self.connection.attach_volume({'driver_volume_type': 'fake'},189 instance_ref['name'],190 '/mnt/engine/something')191 self.connection.detach_volume({'driver_volume_type': 'fake'},192 instance_ref['name'],193 '/mnt/engine/something')194 @catch_notimplementederror195 def test_get_info(self):196 instance_ref, network_info = self._get_running_instance()197 info = self.connection.get_info(instance_ref['name'])198 self.assertIn('state', info)199 self.assertIn('max_mem', info)200 self.assertIn('mem', info)201 self.assertIn('num_cpu', info)202 self.assertIn('cpu_time', info)203 @catch_notimplementederror204 def test_get_info_for_unknown_instance(self):205 self.assertRaises(exception.NotFound,206 self.connection.get_info, 'I just made this name up')207 @catch_notimplementederror208 def test_get_diagnostics(self):209 instance_ref, network_info = self._get_running_instance()210 self.connection.get_diagnostics(instance_ref['name'])211 @catch_notimplementederror212 def test_list_disks(self):213 instance_ref, network_info = self._get_running_instance()214 self.connection.list_disks(instance_ref['name'])215 @catch_notimplementederror216 def test_list_interfaces(self):217 instance_ref, network_info = self._get_running_instance()218 self.connection.list_interfaces(instance_ref['name'])219 @catch_notimplementederror220 def test_block_stats(self):221 instance_ref, network_info = self._get_running_instance()222 stats = self.connection.block_stats(instance_ref['name'], 'someid')223 self.assertEquals(len(stats), 5)224 @catch_notimplementederror225 def test_interface_stats(self):226 instance_ref, network_info = self._get_running_instance()227 stats = self.connection.interface_stats(instance_ref['name'], 'someid')228 self.assertEquals(len(stats), 8)229 @catch_notimplementederror230 def test_get_console_output(self):231 instance_ref, network_info = self._get_running_instance()232 console_output = self.connection.get_console_output(instance_ref)233 self.assertTrue(isinstance(console_output, basestring))234 @catch_notimplementederror235 def test_get_ajax_console(self):236 instance_ref, network_info = self._get_running_instance()237 ajax_console = self.connection.get_ajax_console(instance_ref)238 self.assertIn('token', ajax_console)239 self.assertIn('host', ajax_console)240 self.assertIn('port', ajax_console)241 @catch_notimplementederror242 def test_get_vnc_console(self):243 instance_ref, network_info = self._get_running_instance()244 vnc_console = self.connection.get_vnc_console(instance_ref)245 self.assertIn('token', vnc_console)246 self.assertIn('host', vnc_console)247 self.assertIn('port', vnc_console)248 @catch_notimplementederror249 def test_get_console_pool_info(self):250 instance_ref, network_info = self._get_running_instance()251 console_pool = self.connection.get_console_pool_info(instance_ref)252 self.assertIn('address', console_pool)253 self.assertIn('username', console_pool)254 self.assertIn('password', console_pool)255 @catch_notimplementederror256 def test_refresh_security_group_rules(self):257 # FIXME: Create security group and add the instance to it258 instance_ref, network_info = self._get_running_instance()259 self.connection.refresh_security_group_rules(1)260 @catch_notimplementederror261 def test_refresh_security_group_members(self):262 # FIXME: Create security group and add the instance to it263 instance_ref, network_info = self._get_running_instance()264 self.connection.refresh_security_group_members(1)265 @catch_notimplementederror266 def test_refresh_provider_fw_rules(self):267 instance_ref, network_info = self._get_running_instance()268 self.connection.refresh_provider_fw_rules()269 @catch_notimplementederror270 def test_update_available_resource(self):271 self.compute = self.start_service('compute', host='dummy')272 self.connection.update_available_resource(self.ctxt, 'dummy')273 @catch_notimplementederror274 def test_compare_cpu(self):275 cpu_info = '''{ "topology": {276 "sockets": 1,277 "cores": 2,278 "threads": 1 },279 "features": [280 "xtpr",281 "tm2",282 "est",283 "vmx",284 "ds_cpl",285 "monitor",286 "pbe",287 "tm",288 "ht",289 "ss",290 "acpi",291 "ds",292 "vme"],293 "arch": "x86_64",294 "model": "Penryn",295 "vendor": "Intel" }'''296 self.connection.compare_cpu(cpu_info)297 @catch_notimplementederror298 def test_ensure_filtering_for_instance(self):299 instance_ref = test_utils.get_test_instance()300 network_info = test_utils.get_test_network_info()301 self.connection.ensure_filtering_rules_for_instance(instance_ref,302 network_info)303 @catch_notimplementederror304 def test_unfilter_instance(self):305 instance_ref = test_utils.get_test_instance()306 network_info = test_utils.get_test_network_info()307 self.connection.unfilter_instance(instance_ref, network_info)308 @catch_notimplementederror309 def test_live_migration(self):310 instance_ref, network_info = self._get_running_instance()311 self.connection.live_migration(self.ctxt, instance_ref, 'otherhost',312 None, None)313 @catch_notimplementederror314 def _check_host_status_fields(self, host_status):315 self.assertIn('disk_total', host_status)316 self.assertIn('disk_used', host_status)317 self.assertIn('host_memory_total', host_status)318 self.assertIn('host_memory_free', host_status)319 @catch_notimplementederror320 def test_update_host_status(self):321 host_status = self.connection.update_host_status()322 self._check_host_status_fields(host_status)323 @catch_notimplementederror324 def test_get_host_stats(self):325 host_status = self.connection.get_host_stats()326 self._check_host_status_fields(host_status)327 @catch_notimplementederror328 def test_set_host_enabled(self):329 self.connection.set_host_enabled('a useless argument?', True)330 @catch_notimplementederror331 def test_host_power_action_reboot(self):332 self.connection.host_power_action('a useless argument?', 'reboot')333 @catch_notimplementederror334 def test_host_power_action_shutdown(self):335 self.connection.host_power_action('a useless argument?', 'shutdown')336 @catch_notimplementederror337 def test_host_power_action_startup(self):338 self.connection.host_power_action('a useless argument?', 'startup')339class AbstractDriverTestCase(_VirtDriverTestCase):340 def setUp(self):341 import engine.virt.driver342 self.driver_module = engine.virt.driver343 def get_driver_connection(_):344 return engine.virt.driver.ComputeDriver()345 self.driver_module.get_connection = get_driver_connection346 super(AbstractDriverTestCase, self).setUp()347class FakeConnectionTestCase(_VirtDriverTestCase):348 def setUp(self):349 import engine.virt.fake350 self.driver_module = engine.virt.fake351 super(FakeConnectionTestCase, self).setUp()352class LibvirtConnTestCase(_VirtDriverTestCase):353 def setUp(self):354 # Put fakelibvirt in place355 if 'libvirt' in sys.modules:356 self.saved_libvirt = sys.modules['libvirt']357 else:358 self.saved_libvirt = None359 import fakelibvirt360 import fake_libvirt_utils361 sys.modules['libvirt'] = fakelibvirt362 import engine.virt.libvirt.connection363 import engine.virt.libvirt.firewall364 engine.virt.libvirt.connection.libvirt = fakelibvirt365 engine.virt.libvirt.connection.libvirt_utils = fake_libvirt_utils366 engine.virt.libvirt.firewall.libvirt = fakelibvirt367 # Point _VirtDriverTestCase at the right module368 self.driver_module = engine.virt.libvirt.connection369 super(LibvirtConnTestCase, self).setUp()370 FLAGS.rescue_image_id = "2"371 FLAGS.rescue_kernel_id = "3"372 FLAGS.rescue_ramdisk_id = None373 def tearDown(self):374 super(LibvirtConnTestCase, self).setUp()375 # Restore libvirt376 import engine.virt.libvirt.connection377 import engine.virt.libvirt.firewall378 if self.saved_libvirt:379 sys.modules['libvirt'] = self.saved_libvirt380 engine.virt.libvirt.connection.libvirt = self.saved_libvirt381 engine.virt.libvirt.connection.libvirt_utils = self.saved_libvirt...

Full Screen

Full Screen

common_catch_and_log.py

Source:common_catch_and_log.py Github

copy

Full Screen

1# Copyright 2014 Google Inc. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS-IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Unit tests for logger."""15__author__ = 'Mike Gainer (mgainer@google.com)'16import datetime17import unittest18import appengine_config19from common import catch_and_log20class CatchAndLogTests(unittest.TestCase):21 def setUp(self):22 appengine_config.PRODUCTION_MODE = False23 self._catch_and_log = catch_and_log.CatchAndLog()24 self._expected = []25 def test_simple(self):26 complaint = 'No cheese, Gromit!'27 self._catch_and_log.critical(complaint)28 self._expect(complaint, catch_and_log._CRITICAL)29 self._assert_logs_match()30 def test_multiple(self):31 complaints = [32 'Failed to find cheese in pantry',33 'Failed to install cheese to mousetrap',34 'Failed to arm trap',35 'Failed to catch mouse'36 ]37 for complaint in complaints:38 self._catch_and_log.critical(complaint)39 self._expect(complaint, catch_and_log._CRITICAL)40 self._assert_logs_match()41 def test_multiple_levels(self):42 complaint = 'No cheese, Gromit!'43 self._catch_and_log.critical(complaint)44 self._expect(complaint, catch_and_log._CRITICAL)45 complaint = 'Moon out of range!'46 self._catch_and_log.warn(complaint)47 self._expect(complaint, catch_and_log._WARNING)48 complaint = 'Parking brake engaged!'49 self._catch_and_log.warning(complaint)50 self._expect(complaint, catch_and_log._WARNING)51 complaint = 'Five mice spectating'52 self._catch_and_log.info(complaint)53 self._expect(complaint, catch_and_log._INFO)54 complaint = 'Red light blinking'55 self._catch_and_log.info(complaint)56 self._expect(complaint, catch_and_log._INFO)57 complaint = 'Low blinker fluid'58 self._catch_and_log.warning(complaint)59 self._expect(complaint, catch_and_log._WARNING)60 complaint = 'Insufficient fuel for landing!'61 self._catch_and_log.critical(complaint)62 self._expect(complaint, catch_and_log._CRITICAL)63 self._assert_logs_match()64 def test_exception_suppressed(self):65 topic = 'Entering Orbit'66 complaint = 'Perigee below surface of Moon!'67 with self._catch_and_log.consume_exceptions(topic):68 raise ValueError(complaint)69 self._expect(70 '%s: ValueError: %s at ' % (topic, complaint) +71 'File "/tests/unit/common_catch_and_log.py", line 86, in '72 'test_exception_suppressed\n raise ValueError(complaint)\n',73 catch_and_log._CRITICAL)74 self._assert_logs_match()75 def test_exception_propagates(self):76 topic = 'Entering Orbit'77 complaint = 'Perigee below surface of Moon!'78 with self.assertRaises(ValueError):79 with self._catch_and_log.propagate_exceptions(topic):80 raise ValueError(complaint)81 self._expect(82 '%s: ValueError: %s at ' % (topic, complaint) +83 'File "/tests/unit/common_catch_and_log.py", line 100, in '84 'test_exception_propagates\n raise ValueError(complaint)\n',85 catch_and_log._CRITICAL)86 self._assert_logs_match()87 def test_traceback_info_suppressed_in_production(self):88 appengine_config.PRODUCTION_MODE = True89 topic = 'Entering Orbit'90 complaint = 'Perigee below surface of Moon!'91 with self._catch_and_log.consume_exceptions(topic):92 raise ValueError(complaint)93 self._expect('%s: ValueError: %s' % (topic, complaint),94 catch_and_log._CRITICAL)95 self._assert_logs_match()96 def _expect(self, message, level):97 self._expected.append({98 'message': message,99 'level': level,100 'timestamp': datetime.datetime.now().strftime(101 catch_and_log._LOG_DATE_FORMAT)})102 def _assert_logs_match(self):103 if len(self._expected) != len(self._catch_and_log.get()):104 self.fail('Expected %d entries, but have %d' %105 (len(self._expected), len(self._catch_and_log.get())))106 for expected, actual in zip(self._expected, self._catch_and_log.get()):107 self.assertEquals(expected['level'], actual['level'])108 self.assertEquals(expected['message'], actual['message'])109 expected_time = datetime.datetime.strptime(110 expected['timestamp'], catch_and_log._LOG_DATE_FORMAT)111 actual_time = datetime.datetime.strptime(112 actual['timestamp'], catch_and_log._LOG_DATE_FORMAT)113 self.assertAlmostEqual(...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1# coding=utf-82from __future__ import unicode_literals3from .. import Provider as CompanyProvider4class Provider(CompanyProvider):5 formats = (6 '{{last_name}} {{company_suffix}}',7 '{{last_name}} {{last_name}} {{company_suffix}}',8 '{{last_name}}',9 '{{last_name}}',10 )11 catch_phrase_formats = (12 '{{catch_phrase_noun}} {{catch_phrase_verb}} {{catch_phrase_attribute}}', )13 nouns = (14 'la sécurité',15 'le plaisir',16 'le confort',17 'la simplicité',18 "l'assurance",19 "l'art",20 'le pouvoir',21 'le droit',22 'la possibilité',23 "l'avantage",24 'la liberté')25 verbs = (26 'de rouler',27 "d'avancer",28 "d'évoluer",29 'de changer',30 "d'innover",31 'de louer',32 "d'atteindre vos buts",33 'de concrétiser vos projets')34 attributes = (35 'de manière efficace',36 'plus rapidement',37 'plus facilement',38 'plus simplement',39 'en toute tranquilité',40 'avant-tout',41 'autrement',42 'naturellement',43 'à la pointe',44 'sans soucis',45 "à l'état pur",46 'à sa source',47 'de manière sûre',48 'en toute sécurité')49 company_suffixes = ('SA', 'S.A.', 'SARL', 'S.A.R.L.', 'S.A.S.', 'et Fils')50 siren_format = "### ### ###"51 def catch_phrase_noun(self):52 """53 Returns a random catch phrase noun.54 """55 return self.random_element(self.nouns)56 def catch_phrase_attribute(self):57 """58 Returns a random catch phrase attribute.59 """60 return self.random_element(self.attributes)61 def catch_phrase_verb(self):62 """63 Returns a random catch phrase verb.64 """65 return self.random_element(self.verbs)66 def catch_phrase(self):67 """68 :example 'integrate extensible convergence'69 """70 catch_phrase = ""71 while True:72 pattern = self.random_element(self.catch_phrase_formats)73 catch_phrase = self.generator.parse(pattern)74 catch_phrase = catch_phrase[0].upper() + catch_phrase[1:]75 if self._is_catch_phrase_valid(catch_phrase):76 break77 return catch_phrase78 # An array containing string which should not appear twice in a catch phrase79 words_which_should_not_appear_twice = ('sécurité', 'simpl')80 def _is_catch_phrase_valid(self, catch_phrase):81 """82 Validates a french catch phrase.83 :param catch_phrase: The catch phrase to validate.84 """85 for word in self.words_which_should_not_appear_twice:86 # Fastest way to check if a piece of word does not appear twice.87 begin_pos = catch_phrase.find(word)88 end_pos = catch_phrase.find(word, begin_pos + 1)89 if begin_pos != -1 and begin_pos != end_pos:90 return False91 return True92 def siren(self):93 """94 Generates a siren number (9 digits).95 """96 return self.numerify(self.siren_format)97 def siret(self, max_sequential_digits=2):98 """99 Generates a siret number (14 digits).100 It is in fact the result of the concatenation of a siren number (9 digits),101 a sequential number (4 digits) and a control number (1 digit) concatenation.102 If $max_sequential_digits is invalid, it is set to 2.103 :param max_sequential_digits The maximum number of digits for the sequential number (> 0 && <= 4).104 """105 if max_sequential_digits > 4 or max_sequential_digits <= 0:106 max_sequential_digits = 2107 sequential_number = str(self.random_number(108 max_sequential_digits)).zfill(4)...

Full Screen

Full Screen

test_helpers.py

Source:test_helpers.py Github

copy

Full Screen

1#!/usr/bin/env python2# This Source Code Form is subject to the terms of the Mozilla Public3# License, v. 2.0. If a copy of the MPL was not distributed with this file,4# You can obtain one at http://mozilla.org/MPL/2.0/.5import sys6import unittest7import os8from lib.helpers import *9from mock import call, MagicMock10dirname = os.path.dirname(os.path.realpath(__file__))11sys.path.append(os.path.join(dirname, '..'))12class RetryFunc():13 def __init__(self):14 self.ran = 015 self.calls = []16 self.err = UserWarning17 def succeed(self, count):18 self.ran = self.ran + 119 self.calls.append(count)20 def fail(self, count):21 self.ran = self.ran + 122 self.calls.append(count)23 raise self.err24class TestRetryFunc(unittest.TestCase):25 def setUp(self):26 self.retry_func = RetryFunc()27 self.catch_func = RetryFunc()28 self.example_method = MagicMock()29 self.throw_exception = False30 def run_logic(self):31 self.example_method('run_logic1')32 if self.throw_exception:33 self.throw_exception = False34 raise UserWarning('error')35 self.example_method('run_logic2')36 def handler_logic(self):37 self.example_method('handler_logic')38 def exception_caught(self):39 self.example_method('exception_handler1')40 retry_func(41 lambda ran: self.handler_logic(),42 catch_func=lambda ran: self.exception_caught(),43 catch=UserWarning, retries=344 )45 self.example_method('exception_handler2')46 def test_passes_retry_count(self):47 self.assertRaises(48 self.retry_func.err,49 retry_func,50 self.retry_func.fail,51 catch=UserWarning, retries=352 )53 self.assertEqual(self.retry_func.calls, [0, 1, 2, 3])54 def test_retries_on_fail(self):55 self.assertRaises(56 self.retry_func.err,57 retry_func,58 self.retry_func.fail,59 catch=UserWarning, retries=360 )61 self.assertEqual(self.retry_func.ran, 4)62 def test_run_catch_func_on_fail(self):63 self.assertRaises(64 self.retry_func.err,65 retry_func,66 self.retry_func.fail,67 catch_func=self.catch_func.succeed,68 catch=UserWarning, retries=369 )70 self.assertEqual(self.catch_func.ran, 4)71 def test_no_retry_on_success(self):72 retry_func(73 self.retry_func.succeed,74 catch=UserWarning, retries=375 )76 self.assertEqual(self.retry_func.ran, 1)77 def test_no_run_catch_func_on_success(self):78 retry_func(79 self.retry_func.succeed,80 catch_func=self.catch_func.succeed,81 catch=UserWarning, retries=382 )83 self.assertEqual(self.catch_func.ran, 0)84 def test_runs_logic_and_handler_in_proper_order(self):85 self.throw_exception = True86 retry_func(87 lambda ran: self.run_logic(),88 catch_func=lambda ran: self.exception_caught(),89 catch=UserWarning, retries=190 )91 expected = [call('run_logic1'),92 call('exception_handler1'),93 call('handler_logic'),94 call('exception_handler2'),95 call('run_logic1'),96 call('run_logic2')]97 self.assertEqual(expected, self.example_method.mock_calls)98if __name__ == '__main__':...

Full Screen

Full Screen

Motion.py

Source:Motion.py Github

copy

Full Screen

1from direct.fsm import StateData2from toontown.toonbase import ToontownGlobals3from direct.directnotify import DirectNotifyGlobal4from direct.fsm import ClassicFSM, State5from direct.fsm import State6import TTEmote7from otp.avatar import Emote8class Motion(StateData.StateData):9 notify = DirectNotifyGlobal.directNotify.newCategory('Motion')10 def __init__(self, toon):11 self.lt = toon12 self.doneEvent = 'motionDone'13 StateData.StateData.__init__(self, self.doneEvent)14 self.fsm = ClassicFSM.ClassicFSM('Motion', [State.State('off', self.enterOff, self.exitOff),15 State.State('neutral', self.enterNeutral, self.exitNeutral),16 State.State('walk', self.enterWalk, self.exitWalk),17 State.State('run', self.enterRun, self.exitRun),18 State.State('sad-neutral', self.enterSadNeutral, self.exitSadNeutral),19 State.State('sad-walk', self.enterSadWalk, self.exitSadWalk),20 State.State('catch-neutral', self.enterCatchNeutral, self.exitCatchNeutral),21 State.State('catch-run', self.enterCatchRun, self.exitCatchRun),22 State.State('catch-eatneutral', self.enterCatchEatNeutral, self.exitCatchEatNeutral),23 State.State('catch-eatnrun', self.enterCatchEatNRun, self.exitCatchEatNRun)], 'off', 'off')24 self.fsm.enterInitialState()25 def delete(self):26 del self.fsm27 def load(self):28 pass29 def unload(self):30 pass31 def enter(self):32 self.notify.debug('enter')33 def exit(self):34 self.fsm.requestFinalState()35 def enterOff(self, rate = 0):36 self.notify.debug('enterOff')37 def exitOff(self):38 pass39 def enterNeutral(self, rate = 0):40 self.notify.debug('enterNeutral')41 def exitNeutral(self):42 self.notify.debug('exitNeutral')43 def enterWalk(self, rate = 0):44 self.notify.debug('enterWalk')45 Emote.globalEmote.disableBody(self.lt, 'enterWalk')46 def exitWalk(self):47 self.notify.debug('exitWalk')48 Emote.globalEmote.releaseBody(self.lt, 'exitWalk')49 def enterRun(self, rate = 0):50 self.notify.debug('enterRun')51 Emote.globalEmote.disableBody(self.lt, 'enterRun')52 def exitRun(self):53 self.notify.debug('exitRun')54 Emote.globalEmote.releaseBody(self.lt, 'exitRun')55 def enterSadNeutral(self, rate = 0):56 self.notify.debug('enterSadNeutral')57 def exitSadNeutral(self):58 self.notify.debug('exitSadNeutral')59 def enterSadWalk(self, rate = 0):60 self.notify.debug('enterSadWalk')61 def exitSadWalk(self):62 pass63 def enterCatchNeutral(self, rate = 0):64 self.notify.debug('enterCatchNeutral')65 def exitCatchNeutral(self):66 self.notify.debug('exitCatchNeutral')67 def enterCatchRun(self, rate = 0):68 self.notify.debug('enterCatchRun')69 def exitCatchRun(self):70 self.notify.debug('exitCatchRun')71 def enterCatchEatNeutral(self, rate = 0):72 self.notify.debug('enterCatchEatNeutral')73 def exitCatchEatNeutral(self):74 self.notify.debug('exitCatchEatNeutral')75 def enterCatchEatNRun(self, rate = 0):76 self.notify.debug('enterCatchEatNRun')77 def exitCatchEatNRun(self):78 self.notify.debug('exitCatchEatNRun')79 def setState(self, anim, rate):80 toon = self.lt81 if toon.playingAnim != anim:...

Full Screen

Full Screen

runner.py

Source:runner.py Github

copy

Full Screen

1#!/usr/bin/python -tt2#3# Copyright (c) 2011 Intel, Inc.4#5# This program is free software; you can redistribute it and/or modify it6# under the terms of the GNU General Public License as published by the Free7# Software Foundation; version 2 of the License8#9# This program is distributed in the hope that it will be useful, but10# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY11# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License12# for more details.13#14# You should have received a copy of the GNU General Public License along15# with this program; if not, write to the Free Software Foundation, Inc., 5916# Temple Place - Suite 330, Boston, MA 02111-1307, USA.17import os18import subprocess19from mic import msger20def runtool(cmdln_or_args, catch=1):21 """ wrapper for most of the subprocess calls22 input:23 cmdln_or_args: can be both args and cmdln str (shell=True)24 catch: 0, quitely run25 1, only STDOUT26 2, only STDERR27 3, both STDOUT and STDERR28 return:29 (rc, output)30 if catch==0: the output will always None31 """32 if catch not in (0, 1, 2, 3):33 # invalid catch selection, will cause exception, that's good34 return None35 if isinstance(cmdln_or_args, list):36 cmd = cmdln_or_args[0]37 shell = False38 else:39 import shlex40 cmd = shlex.split(cmdln_or_args)[0]41 shell = True42 if catch != 3:43 dev_null = os.open("/dev/null", os.O_WRONLY)44 if catch == 0:45 sout = dev_null46 serr = dev_null47 elif catch == 1:48 sout = subprocess.PIPE49 serr = dev_null50 elif catch == 2:51 sout = dev_null52 serr = subprocess.PIPE53 elif catch == 3:54 sout = subprocess.PIPE55 serr = subprocess.STDOUT56 try:57 p = subprocess.Popen(cmdln_or_args, stdout=sout,58 stderr=serr, shell=shell)59 (sout, serr) = p.communicate()60 # combine stdout and stderr, filter None out61 out = ''.join(filter(None, [sout, serr]))62 except OSError, e:63 if e.errno == 2:64 # [Errno 2] No such file or directory65 msger.error('Cannot run command: %s, lost dependency?' % cmd)66 else:67 raise # relay68 finally:69 if catch != 3:70 os.close(dev_null)71 return (p.returncode, out)72def show(cmdln_or_args):73 # show all the message using msger.verbose74 rc, out = runtool(cmdln_or_args, catch=3)75 if isinstance(cmdln_or_args, list):76 cmd = ' '.join(cmdln_or_args)77 else:78 cmd = cmdln_or_args79 msg = 'running command: "%s"' % cmd80 if out: out = out.strip()81 if out:82 msg += ', with output::'83 msg += '\n +----------------'84 for line in out.splitlines():85 msg += '\n | %s' % line86 msg += '\n +----------------'87 msger.verbose(msg)88 return rc89def outs(cmdln_or_args, catch=1):90 # get the outputs of tools91 return runtool(cmdln_or_args, catch)[1].strip()92def quiet(cmdln_or_args):...

Full Screen

Full Screen

location.py

Source:location.py Github

copy

Full Screen

1#_*_ coding=utf-8 _*_2from selenium import webdriver3from selenium.webdriver.support.ui import Select4from selenium.webdriver.common.action_chains import ActionChains5from poseidon.base import CommonBase as cb6'''7简易封装定位单个元素和一组元素的方法8'''9"""定位单个元素"""10@cb.com_try_catch11def findId(driver, id):12 f = driver.find_element_by_id(id)13 return f14@cb.com_try_catch15def findName(driver, name):16 f = driver.find_element_by_name(name)17 return f18@cb.com_try_catch19def findClassName(driver, name):20 f = driver.find_element_by_class_name(name)21 return f22@cb.com_try_catch23def findTagName(driver, name):24 f = driver.find_element_by_tag_name(name)25 return f26@cb.com_try_catch27def findLinkText(driver, text):28 f = driver.find_element_by_link_text(text)29 return f30@cb.com_try_catch31def findPLinkText(driver, text):32 f = driver.find_element_by_partial_link_text(text)33 return f34@cb.com_try_catch35def findXpath(driver, xpath):36 f = driver.find_element_by_xpath(xpath)37 return f38@cb.com_try_catch39def findCss(driver, css):40 f = driver.find_element_by_css_selector(css)41 return f42'''定位一组元素'''43@cb.com_try_catch44def findsId(driver, id):45 f = driver.find_elements_by_id(id)46 return f47@cb.com_try_catch48def findsName(driver, name):49 f = driver.find_elements_by_name(name)50 return f51@cb.com_try_catch52def findsClassName(driver, name):53 f = driver.find_elements_by_class_name(name)54 return f55@cb.com_try_catch56def findsTagName(driver, name):57 f = driver.find_elements_by_tag_name(name)58 return f59@cb.com_try_catch60def findsLinkText(driver, text):61 f = driver.find_elements_by_link_text(text)62 return f63@cb.com_try_catch64def findsPLinkText(driver, text):65 f = driver.find_elements_by_partial_link_text(text)66 return f67@cb.com_try_catch68def findsXpath(driver, xpath):69 f = driver.find_elements_by_xpath(xpath)70 return f71@cb.com_try_catch72def findsCss(driver, css):73 f = driver.find_elements_by_css_selector(css)74 return f75def findDropdown(driver, xpath, index=0, tag_name="li"):76 """77 :param driver:78 :param xpath: 下拉列表的xpath(或其他定位)定位79 :return:80 """81 element = findXpath(driver, xpath)82 if element.tag_name.lower() != "select":83 findXpath(element, tag_name + "[" + str(index) + "]").click()84 else:85 Select(findsXpath(driver, element)).select_by_index(index)86 # aa = findsXpath(temp, "li")87 # print "success is %s" % len(aa)88def clikUnClickableElement(driver, xpath):89 f = findXpath(driver, xpath)...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var argosy = require('argosy')2var catchArgosy = require('catch-argosy')3var argosyService = argosy()4var catchService = catchArgosy(argosyService)5catchService.catch(function (err) {6 console.log("error caught", err)7})8var argosyPattern = require('argosy-pattern')9var catchPattern = require('catch-argosy-pattern')10var patternService = argosyPattern()11var catchService = catchPattern(patternService)12catchService.catch(function (err) {13 console.log("error caught", err)14})15### CatchArgosy(argosyService)16### CatchPattern(patternService)17### catchArgosy.catch(callback)18### catchPattern.catch(callback)

Full Screen

Using AI Code Generation

copy

Full Screen

1const argosy = require('argosy')2const pattern = require('argosy-pattern')3const service = argosy()4service.accept({5 add: pattern.match({6 })7}, function (msg, cb) {8 cb(null, msg.a + msg.b)9})10service.accept({11 catch: pattern.catch()12}, function (msg, cb) {13 cb(null, 'I caught an error')14})15service.listen(8000)16const argosy = require('argosy')17const pattern = require('argosy-pattern')18const service = argosy()19service.accept({20 add: pattern.match({21 })22}, function (msg, cb) {23 cb(null, msg.a + msg.b)24})25service.accept({26 catch: pattern.catch()27}, function (msg, cb) {28 cb(null, 'I caught an error')29})30service.listen(8000)31const argosy = require('argosy')32const pattern = require('argosy-pattern')33const service = argosy()34service.accept({35 add: pattern.match({36 })37}, function (msg, cb) {38 cb(null, msg.a + msg.b)39})40service.accept({41 catch: pattern.catch()42}, function (msg, cb) {43 cb(null, 'I caught an error')44})45service.listen(8000)46const argosy = require('argosy')47const pattern = require('argosy-pattern')48const service = argosy()49service.accept({50 add: pattern.match({51 })52}, function (msg, cb) {53 cb(null, msg.a + msg.b)54})55service.accept({56 catch: pattern.catch()57}, function (msg, cb) {58 cb(null, 'I caught an error')59})60service.listen(8000)61const argosy = require('argosy')

Full Screen

Using AI Code Generation

copy

Full Screen

1var argosy = require('argosy')2var argosyPattern = require('argosy-pattern')3var argosyCatch = require('argosy-catch')4var patterns = {5}6var options = {7 catch: function (err, context) {8 console.log(err)9 }10}11var service = argosy()12 .use(argosyCatch(options))13 .use(function (request, respond) {14 respond(null, 'hello world')15 })16service.accept({17})18service.connect({19})20service.request({21}, function (err, response) {22})23service.request({24}, function (err, response) {25})

Full Screen

Using AI Code Generation

copy

Full Screen

1var argosy = require('argosy')2var pattern = require('argosy-pattern')3var service = argosy()4service.accept(catchPattern({5}, function (args, cb) {6 cb(null, args.message)7}))8service.on('error', function (err) {9 console.log(err)10})11service.pipe(service)12### argosy-pattern.not(pattern)13var argosy = require('argosy')14var pattern = require('argosy-pattern')15var service = argosy()16service.accept(notPattern({17}, function (args, cb) {18 cb(null, args.message)19}))20service.on('error', function (err) {21 console.log(err)22})23service.pipe(service)24### argosy-pattern.any(pattern)25var argosy = require('argosy')26var pattern = require('argosy-pattern')27var service = argosy()28service.accept(anyPattern([29 {30 },31 {32 }33], function (args, cb) {34 cb(null, args.message)35}))36service.on('error', function (err) {37 console.log(err)38})39service.pipe(service)40### argosy-pattern.all(pattern)41var argosy = require('argosy')42var pattern = require('argosy-pattern')43var service = argosy()44service.accept(allPattern([45 {46 },47 {48 }49], function (args, cb) {50 cb(null, args.message)51}))52service.on('error', function (err) {53 console.log(err)54})

Full Screen

Using AI Code Generation

copy

Full Screen

1var argos = require('argos');2argos.catch(function(err) {3 console.log(err);4});5var argos = require('argos');6argos.catch(function(err) {7 console.log(err);8});9var argos = require('argos');10argos.catch(function(err) {11 console.log(err);12});13var argos = require('argos');14argos.catch(function(err) {15 console.log(err);16});17var argos = require('argos');18argos.catch(function(err) {19 console.log(err);20});21var argos = require('argos');22argos.catch(function(err) {23 console.log(err);24});

Full Screen

Using AI Code Generation

copy

Full Screen

1var argosy = require('argosy');2var argosyPattern = require('argosy-pattern');3var catchPattern = argosyPattern.catch;4var catchMethod = argosy.catch;5var service = argosy();6catchMethod(service);7service.accept(catchPattern, function (message, callback) {8 console.log('Error: ' + message);9 callback(null, 'OK');10});11service.listen();12var client = argosy();13client.pipe(service).pipe(client);14client.catch('Error: This is an error message')15 .then(function (response) {16 console.log('Response: ' + response);17 });

Full Screen

Using AI Code Generation

copy

Full Screen

1var argosy = require('argosy')2var pattern = require('argosy-pattern')3var service = argosy()4service.accept({5 catch: catchPattern()6}, function (err, reply) {7 console.log(err)8 reply(null, { message: 'Error caught' })9})10service.accept({11 catch: catchPattern({ message: 'error' })12}, function (err, reply) {13 console.log(err)14 reply(null, { message: 'Error caught' })15})16service.accept({17 catch: catchPattern({ message: 'error' })18}, function (err, reply) {19 console.log(err)20 reply(null, { message: 'Error caught' })21})22service.act({23}, function (err, reply) {24 console.log(err)25 console.log(reply)26})27service.act({28}, function (err, reply) {29 console.log(err)30 console.log(reply)31})32service.act({33}, function (err, reply) {34 console.log(err)35 console.log(reply)36})

Full Screen

Using AI Code Generation

copy

Full Screen

1var pattern = require('argosy-pattern')2var catchMethod = catchPattern(function (err) {3 console.log(err)4})5catchMethod(new Error('Catch me if you can'))6var pattern = require('argosy-pattern')7var matchMethod = matchPattern({8})9var matched = matchMethod({10})11console.log(matched)12var pattern = require('argosy-pattern')13var matchMethod = matchPattern({14})15var matched = matchMethod({16})17console.log(matched)18var pattern = require('argosy-pattern')19var matchMethod = matchPattern({20})21var matched = matchMethod({22})23console.log(matched)24var pattern = require('argosy-pattern')25var matchMethod = matchPattern({26})27var matched = matchMethod({28})29console.log(matched)30var pattern = require('argosy-pattern')31var matchMethod = matchPattern({32})33var matched = matchMethod({34})35console.log(matched)

Full Screen

Using AI Code Generation

copy

Full Screen

1var argos = require('argos');2var argv = argos.argv;3if (argv.catch) {4 console.log('catch is set to true');5} else {6 console.log('catch is set to false');7}8var argos = require('argos');9var argv = argos.argv;10if (argv.catch) {11 console.log('catch is set to true');12} else {13 console.log('catch is set to false');14}15var argos = require('argos');16var argv = argos.argv;17console.log(argv._);18var argos = require('argos');19var argv = argos.argv;20console.log(argv._);21var argos = require('argos');22var argv = argos.argv;23console.log(argv._);24var argos = require('argos');25var argv = argos.argv;26console.log(argv._);

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run argos automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful