How to use catch method in Cucumber-gherkin

Best JavaScript code snippet using cucumber-gherkin

02_threads.py

Source:02_threads.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# Как создать и запустить поток3import random4import time5from collections import defaultdict6from threading import Thread7FISH = (None, 'плотва', 'окунь', 'лещ')8# Определим функцию, эмулирующую рыбалку9def fishing(name, worms):10    catch = defaultdict(int)11    for worm in range(worms):12        print(f'{name}: Червяк № {worm} - Забросил, ждем...', flush=True)13        _ = 3 ** (random.randint(50, 70) * 10000)14        fish = random.choice(FISH)15        if fish is None:16            print(f'{name}: Тьфу, сожрали червяка...', flush=True)17        else:18            print(f'{name}: Ага, у меня {fish}', flush=True)19            catch[fish] += 120    print(f'Итого рыбак {name} поймал:')21    for fish, count in catch.items():22        print(f'    {fish} - {count}')23fishing(name='Вася', worms=10)24# А теперь создадим второго рыбака, пошедшего на рыбалку ОДНОВРЕМЕННО с первым25thread = Thread(target=fishing, kwargs=dict(name='Вася', worms=10))26thread.start()27fishing(name='Коля', worms=10)28thread.join()29# При простой передаче функции в поток результат выполнения функции30# можно получить только через изменяемый обьект в параметрах:31def fishing(name, worms, catch):32    for worm in range(worms):33        print(f'{name}: Червяк № {worm} - Забросил, ждем...', flush=True)34        _ = 3 ** (random.randint(50, 70) * 10000)35        fish = random.choice(FISH)36        if fish is None:37            print(f'{name}: Тьфу, сожрали червяка...', flush=True)38        else:39            print(f'{name}: Ага, у меня {fish}', flush=True)40            catch[fish] += 141vasya_catch = defaultdict(int)42thread = Thread(target=fishing, kwargs=dict(name='Вася', worms=10, catch=vasya_catch))43thread.start()44kolya_catch = defaultdict(int)45fishing(name='Коля', worms=10, catch=kolya_catch)46thread.join()47for name, catch in (('Вася', vasya_catch), ('Вася', kolya_catch)):48    print(f'Итого рыбак {name} поймал:')49    for fish, count in catch.items():50        print(f'    {fish} - {count}')51# Более удобно использовать наследование от класса потока52class Fisher(Thread):53    def __init__(self, name, worms, *args, **kwargs):54        super(Fisher, self).__init__(*args, **kwargs)55        self.name = name56        self.worms = worms57    def run(self):58        catch = defaultdict(int)59        for worm in range(self.worms):60            print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)61            _ = 3 ** (random.randint(50, 70) * 10000)62            fish = random.choice(FISH)63            if fish is None:64                print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)65            else:66                print(f'{self.name}: Ага, у меня {fish}', flush=True)67                catch[fish] += 168        print(f'Итого рыбак {self.name} поймал:')69        for fish, count in catch.items():70            print(f'    {fish} - {count}')71vasya = Fisher(name='Вася', worms=10)72kolya = Fisher(name='Коля', worms=10)73print('.' * 20, 'Они пошли на рыбалку')74vasya.start()75kolya.start()76print('.' * 20, 'Ждем пока они вернутся...')77vasya.join()78kolya.join()79print('.' * 20, 'Итак, они вернулись')80# Если нужен результат выполнения, то просто делаем атрибут класса81class Fisher(Thread):82    def __init__(self, name, worms, *args, **kwargs):83        super().__init__(*args, **kwargs)84        self.name = name85        self.worms = worms86        self.catch = defaultdict(int)87    def run(self):88        self.catch = defaultdict(int)89        for worm in range(self.worms):90            print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)91            _ = 3 ** (random.randint(50, 70) * 10000)92            fish = random.choice(FISH)93            if fish is None:94                print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)95            else:96                print(f'{self.name}: Ага, у меня {fish}', flush=True)97                self.catch[fish] += 198vasya = Fisher(name='Вася', worms=10)99kolya = Fisher(name='Коля', worms=10)100print('.' * 20, 'Они пошли на рыбалку')101vasya.start()102kolya.start()103print('.' * 20, 'Ждем пока они вернутся...')104vasya.join()105kolya.join()106print('.' * 20, 'Итак, они вернулись')107for fisher in (vasya, kolya):108    print(f'Итого рыбак {fisher.name} поймал:')109    for fish, count in fisher.catch.items():110        print(f'    {fish} - {count}')111# Что будет если в потоке ошибка112class Fisher(Thread):113    def __init__(self, name, worms, *args, **kwargs):114        super().__init__(*args, **kwargs)115        self.name = name116        self.worms = worms117        self.catch = defaultdict(int)118    def run(self):119        self.catch = defaultdict(int)120        for worm in range(self.worms):121            print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)122            _ = 3 ** (random.randint(50, 70) * 10000)123            dice = random.randint(1, 5)124            if self.name == 'Коля' and dice == 1:125                raise ValueError(f'{self.name}: Блин, у меня сломалась удочка на {worm} червяке :(')126            fish = random.choice(FISH)127            if fish is None:128                print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)129            else:130                print(f'{self.name}: Ага, у меня {fish}', flush=True)131                self.catch[fish] += 1132vasya = Fisher(name='Вася', worms=10)133kolya = Fisher(name='Коля', worms=10)134print('.' * 20, 'Они пошли на рыбалку')135vasya.start()136kolya.start()137print('.' * 20, 'Ждем пока они вернутся...')138vasya.join()139kolya.join()140print('.' * 20, 'Итак, они вернулись')141for fisher in (vasya, kolya):142    print(f'Итого рыбак {fisher.name} поймал:')143    for fish, count in fisher.catch.items():144        print(f'    {fish} - {count}')145# Обрабатывать ошибки нужно в самом потоке146class Fisher(Thread):147    def __init__(self, name, worms, *args, **kwargs):148        super().__init__(*args, **kwargs)149        self.name = name150        self.worms = worms151        self.catch = defaultdict(int)152    def run(self):153        try:154            self._fishing()155        except Exception as exc:156            print(exc)157    def _fishing(self):158        self.catch = defaultdict(int)159        for worm in range(self.worms):160            print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)161            _ = 3 ** (random.randint(50, 70) * 10000)162            dice = random.randint(1, 5)163            if self.name == 'Коля' and dice == 1:164                raise ValueError(f'{self.name}: Блин, у меня сломалась удочка на {worm} червяке :(')165            fish = random.choice(FISH)166            if fish is None:167                print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)168            else:169                print(f'{self.name}: Ага, у меня {fish}', flush=True)170                self.catch[fish] += 1171vasya = Fisher(name='Вася', worms=10)172kolya = Fisher(name='Коля', worms=10)173print('.' * 20, 'Они пошли на рыбалку')174vasya.start()175kolya.start()176print('.' * 20, 'Ждем пока они вернутся...')177vasya.join()178kolya.join()179print('.' * 20, 'Итак, они вернулись')180for fisher in (vasya, kolya):181    print(f'Итого рыбак {fisher.name} поймал:')182    for fish, count in fisher.catch.items():183        print(f'    {fish} - {count}')184# Вроде все прекрасно, но в пайтоне есть суровый GIL - Global Interpreter Lock - https://goo.gl/MTokAe185# GIL велик и ужасен - это механизм блокировки выполнения потоков, пока один выполняется.186# Что, почему, зачем, Карл?! Зачем стрелять себе в ногу?187#188# Все дело в том, что сам пайтон - процесс в операционной системе. И ОС может приостанавливать выполнение189# самого процесса пайтона. А когда происходит двойное засыпание/просыпание,190# то возникает проблема доступа к одним и тем участкам памяти. Поэтому разработчики CPython,191# (а у нас именно эта реализация) решили сделать GIL.192#193# Поэтому нельзя получить выгоду в производительности программы в т.н. CPU-bounded алгоритмах194# (это те, которым требуется процессорное время и не нужны операции ввода/вывода)195class Fisher(Thread):196    def __init__(self, name, worms, *args, **kwargs):197        super().__init__(*args, **kwargs)198        self.name = name199        self.worms = worms200        self.catch = defaultdict(int)201    def run(self):202        self.catch = defaultdict(int)203        for worm in range(self.worms):204            _ = worm ** 10000  # фиксируем время ожидания поклевки205            fish = random.choice(FISH)206            if fish is not None:207                self.catch[fish] += 1208def time_track(func):209    def surrogate(*args, **kwargs):210        started_at = time.time()211        result = func(*args, **kwargs)212        ended_at = time.time()213        elapsed = round(ended_at - started_at, 6)214        print(f'Функция {func.__name__} работала {elapsed} секунд(ы)',)215        return result216    return surrogate217@time_track218def run_in_one_thread(fishers):219    for fisher in fishers:220        fisher.run()221@time_track222def run_in_threads(fishers):223    for fisher in fishers:224        fisher.start()225    for fisher in fishers:226        fisher.join()227humans = ['Васек', 'Колян', 'Петрович', 'Хмурый', 'Клава', ]228fishers = [Fisher(name=name, worms=100) for name in humans]229run_in_one_thread(fishers)230run_in_threads(fishers)231# Хорошая новость в том, что пайтон отпускает GIL перед системными вызовами. Чтение из файла, к примеру,232# может занимать длительное время и совершенно не требует GIL — можно дать шанс другим потокам поработать.233# ...234# Профит! там где есть операции ввода/ввывода: чтение с диска, обменд данными по сети, етс.235# Хорошая новость: любая программа должна обмениваться данными с внешним миром :)236class Fisher(Thread):237    def __init__(self, name, worms, *args, **kwargs):238        super().__init__(*args, **kwargs)239        self.name = name240        self.worms = worms241        self.catch = defaultdict(int)242    def run(self):243        self.catch = defaultdict(int)244        for worm in range(self.worms):245            time.sleep(0.01)  # TODO тут вызов системной функции246            fish = random.choice(FISH)247            if fish is not None:248                self.catch[fish] += 1249def time_track(func):250    def surrogate(*args, **kwargs):251        started_at = time.time()252        result = func(*args, **kwargs)253        ended_at = time.time()254        elapsed = round(ended_at - started_at, 6)255        print(f'Функция {func.__name__} работала {elapsed} секунд(ы)',)256        return result257    return surrogate258@time_track259def run_in_one_thread(fishers):260    for fisher in fishers:261        fisher.run()262@time_track263def run_in_threads(fishers):264    for fisher in fishers:265        fisher.start()266    for fisher in fishers:267        fisher.join()268humans = ['Васек', 'Колян', 'Петрович', 'Хмурый', 'Клава', ]269fishers = [Fisher(name=name, worms=100) for name in humans]270run_in_one_thread(fishers)271run_in_threads(fishers)272# Про обьяснение механизма GIL очень рекомендую посмотреть видео с Moscow Python Meetup:273# Григорий Петров. "GIL в Python: зачем он нужен и как с этим жить"274# http://www.moscowpython.ru/meetup/14/gil-and-python-why/275###276# Извне завершить поток невозможно штатными средствами пайтона.277# И это правильно, вот две основные проблемы с "убийством" потока:278#   - поток может активно работать с данными в этот момент и принудительное завершениие разрушит их целостность.279#   - поток может породить другие потоки - их тоже завершать?280# Что делать? Можно добавить признак выхода:281class Fisher(Thread):282    def __init__(self, name, worms, *args, **kwargs):283        super().__init__(*args, **kwargs)284        self.name = name285        self.worms = worms286        self.catch = defaultdict(int)287        # будем проверять в цикле - а не пора ли нам заканчивать?288        self.need_stop = False289    def run(self):290        self.catch = defaultdict(int)291        for worm in range(self.worms):292            print(f'{self.name}: Червяк № {worm} - Забросил, ждем...', flush=True)293            _ = 3 ** (random.randint(50, 70) * 10000)294            fish = random.choice(FISH)295            if fish is None:296                print(f'{self.name}: Тьфу, сожрали червяка...', flush=True)297            else:298                print(f'{self.name}: Ага, у меня {fish}', flush=True)299                self.catch[fish] += 1300            if self.need_stop:301                print(f'{self.name}: Ой, жена ужинать зовет! Сматываем удочки...', flush=True)302                break303vasya = Fisher(name='Вася', worms=100)304vasya.start()305time.sleep(1)306if vasya.is_alive():  # кстати с помощью этого метода можно проверить выполняется ли еще поток?307    vasya.need_stop = True308vasya.join()  # ожидание завершения обязательно - поток может некоторое время финализировать работу309# Подводя итог, нужно сказать, что в мультипоточном программированиии наши линейный код310# перестают быть линейным (СИНХРОННЫМ). Если ранее мы были уверены в последовательности выполнения кода,311# то при использовании потоков (процессов) код может выполняться АСИНХРОННО: нельзя гарантировать312# что за этим блоком кода будет выполняться вот этот....

Full Screen

Full Screen

test_virt_drivers.py

Source:test_virt_drivers.py Github

copy

Full Screen

1# vim: tabstop=4 shiftwidth=4 softtabstop=42#3#    Copyright 2010 X7 LLC4#5#    Licensed under the Apache License, Version 2.0 (the "License"); you may6#    not use this file except in compliance with the License. You may obtain7#    a copy of the License at8#9#         http://www.apache.org/licenses/LICENSE-2.010#11#    Unless required by applicable law or agreed to in writing, software12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14#    License for the specific language governing permissions and limitations15#    under the License.16import base6417import netaddr18import sys19import traceback20from engine import exception21from engine import flags22from engine import image23from engine import log as logging24from engine import test25from engine.tests import utils as test_utils26libvirt = None27FLAGS = flags.FLAGS28LOG = logging.getLogger('engine.tests.test_virt_drivers')29def catch_notimplementederror(f):30    """Decorator to simplify catching drivers raising NotImplementedError31    If a particular call makes a driver raise NotImplementedError, we32    log it so that we can extract this information afterwards to33    automatically generate a hypervisor/feature support matrix."""34    def wrapped_func(self, *args, **kwargs):35        try:36            return f(self, *args, **kwargs)37        except NotImplementedError:38            frame = traceback.extract_tb(sys.exc_info()[2])[-1]39            LOG.error('%(driver)s does not implement %(method)s' % {40                                               'driver': type(self.connection),41                                               'method': frame[2]})42    wrapped_func.__name__ = f.__name__43    wrapped_func.__doc__ = f.__doc__44    return wrapped_func45class _VirtDriverTestCase(test.TestCase):46    def setUp(self):47        super(_VirtDriverTestCase, self).setUp()48        self.connection = self.driver_module.get_connection('')49        self.ctxt = test_utils.get_test_admin_context()50        self.image_service = image.get_default_image_service()51    def _get_running_instance(self):52        instance_ref = test_utils.get_test_instance()53        network_info = test_utils.get_test_network_info()54        image_info = test_utils.get_test_image_info(None, instance_ref)55        self.connection.spawn(self.ctxt, instance=instance_ref,56                              image_meta=image_info,57                              network_info=network_info)58        return instance_ref, network_info59    @catch_notimplementederror60    def test_init_host(self):61        self.connection.init_host('myhostname')62    @catch_notimplementederror63    def test_list_instances(self):64        self.connection.list_instances()65    @catch_notimplementederror66    def test_list_instances_detail(self):67        self.connection.list_instances_detail()68    @catch_notimplementederror69    def test_spawn(self):70        instance_ref, network_info = self._get_running_instance()71        domains = self.connection.list_instances()72        self.assertIn(instance_ref['name'], domains)73        domains_details = self.connection.list_instances_detail()74        self.assertIn(instance_ref['name'], [i.name for i in domains_details])75    @catch_notimplementederror76    def test_snapshot_not_running(self):77        instance_ref = test_utils.get_test_instance()78        img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})79        self.assertRaises(exception.InstanceNotRunning,80                          self.connection.snapshot,81                          self.ctxt, instance_ref, img_ref['id'])82    @catch_notimplementederror83    def test_snapshot_running(self):84        img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})85        instance_ref, network_info = self._get_running_instance()86        self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'])87    @catch_notimplementederror88    def test_reboot(self):89        reboot_type = "SOFT"90        instance_ref, network_info = self._get_running_instance()91        self.connection.reboot(instance_ref, network_info, reboot_type)92    @catch_notimplementederror93    def test_get_host_ip_addr(self):94        host_ip = self.connection.get_host_ip_addr()95        # Will raise an exception if it's not a valid IP at all96        ip = netaddr.IPAddress(host_ip)97        # For now, assume IPv4.98        self.assertEquals(ip.version, 4)99    @catch_notimplementederror100    def test_resize_running(self):101        instance_ref, network_info = self._get_running_instance()102        self.connection.resize(instance_ref, 7)103    @catch_notimplementederror104    def test_set_admin_password(self):105        instance_ref, network_info = self._get_running_instance()106        self.connection.set_admin_password(instance_ref, 'p4ssw0rd')107    @catch_notimplementederror108    def test_inject_file(self):109        instance_ref, network_info = self._get_running_instance()110        self.connection.inject_file(instance_ref,111                                    base64.b64encode('/testfile'),112                                    base64.b64encode('testcontents'))113    @catch_notimplementederror114    def test_agent_update(self):115        instance_ref, network_info = self._get_running_instance()116        self.connection.agent_update(instance_ref, 'http://www.x7.org/',117                                     'd41d8cd98f00b204e9800998ecf8427e')118    @catch_notimplementederror119    def test_rescue(self):120        instance_ref, network_info = self._get_running_instance()121        self.connection.rescue(self.ctxt, instance_ref, network_info, None)122    @catch_notimplementederror123    def test_unrescue_unrescued_instance(self):124        instance_ref, network_info = self._get_running_instance()125        self.connection.unrescue(instance_ref, network_info)126    @catch_notimplementederror127    def test_unrescue_rescued_instance(self):128        instance_ref, network_info = self._get_running_instance()129        self.connection.rescue(self.ctxt, instance_ref, network_info, None)130        self.connection.unrescue(instance_ref, network_info)131    @catch_notimplementederror132    def test_poll_rebooting_instances(self):133        self.connection.poll_rebooting_instances(10)134    @catch_notimplementederror135    def test_poll_rescued_instances(self):136        self.connection.poll_rescued_instances(10)137    @catch_notimplementederror138    def test_poll_unconfirmed_resizes(self):139        self.connection.poll_unconfirmed_resizes(10)140    @catch_notimplementederror141    def test_migrate_disk_and_power_off(self):142        instance_ref, network_info = self._get_running_instance()143        instance_type_ref = test_utils.get_test_instance_type()144        self.connection.migrate_disk_and_power_off(145            self.ctxt, instance_ref, 'dest_host', instance_type_ref)146    @catch_notimplementederror147    def test_pause(self):148        instance_ref, network_info = self._get_running_instance()149        self.connection.pause(instance_ref)150    @catch_notimplementederror151    def test_unpause_unpaused_instance(self):152        instance_ref, network_info = self._get_running_instance()153        self.connection.unpause(instance_ref)154    @catch_notimplementederror155    def test_unpause_paused_instance(self):156        instance_ref, network_info = self._get_running_instance()157        self.connection.pause(instance_ref)158        self.connection.unpause(instance_ref)159    @catch_notimplementederror160    def test_suspend(self):161        instance_ref, network_info = self._get_running_instance()162        self.connection.suspend(instance_ref)163    @catch_notimplementederror164    def test_resume_unsuspended_instance(self):165        instance_ref, network_info = self._get_running_instance()166        self.connection.resume(instance_ref)167    @catch_notimplementederror168    def test_resume_suspended_instance(self):169        instance_ref, network_info = self._get_running_instance()170        self.connection.suspend(instance_ref)171        self.connection.resume(instance_ref)172    @catch_notimplementederror173    def test_destroy_instance_nonexistant(self):174        fake_instance = {'id': 42, 'name': 'I just made this up!'}175        network_info = test_utils.get_test_network_info()176        self.connection.destroy(fake_instance, network_info)177    @catch_notimplementederror178    def test_destroy_instance(self):179        instance_ref, network_info = self._get_running_instance()180        self.assertIn(instance_ref['name'],181                      self.connection.list_instances())182        self.connection.destroy(instance_ref, network_info)183        self.assertNotIn(instance_ref['name'],184                         self.connection.list_instances())185    @catch_notimplementederror186    def test_attach_detach_volume(self):187        instance_ref, network_info = self._get_running_instance()188        self.connection.attach_volume({'driver_volume_type': 'fake'},189                                      instance_ref['name'],190                                      '/mnt/engine/something')191        self.connection.detach_volume({'driver_volume_type': 'fake'},192                                      instance_ref['name'],193                                      '/mnt/engine/something')194    @catch_notimplementederror195    def test_get_info(self):196        instance_ref, network_info = self._get_running_instance()197        info = self.connection.get_info(instance_ref['name'])198        self.assertIn('state', info)199        self.assertIn('max_mem', info)200        self.assertIn('mem', info)201        self.assertIn('num_cpu', info)202        self.assertIn('cpu_time', info)203    @catch_notimplementederror204    def test_get_info_for_unknown_instance(self):205        self.assertRaises(exception.NotFound,206                          self.connection.get_info, 'I just made this name up')207    @catch_notimplementederror208    def test_get_diagnostics(self):209        instance_ref, network_info = self._get_running_instance()210        self.connection.get_diagnostics(instance_ref['name'])211    @catch_notimplementederror212    def test_list_disks(self):213        instance_ref, network_info = self._get_running_instance()214        self.connection.list_disks(instance_ref['name'])215    @catch_notimplementederror216    def test_list_interfaces(self):217        instance_ref, network_info = self._get_running_instance()218        self.connection.list_interfaces(instance_ref['name'])219    @catch_notimplementederror220    def test_block_stats(self):221        instance_ref, network_info = self._get_running_instance()222        stats = self.connection.block_stats(instance_ref['name'], 'someid')223        self.assertEquals(len(stats), 5)224    @catch_notimplementederror225    def test_interface_stats(self):226        instance_ref, network_info = self._get_running_instance()227        stats = self.connection.interface_stats(instance_ref['name'], 'someid')228        self.assertEquals(len(stats), 8)229    @catch_notimplementederror230    def test_get_console_output(self):231        instance_ref, network_info = self._get_running_instance()232        console_output = self.connection.get_console_output(instance_ref)233        self.assertTrue(isinstance(console_output, basestring))234    @catch_notimplementederror235    def test_get_ajax_console(self):236        instance_ref, network_info = self._get_running_instance()237        ajax_console = self.connection.get_ajax_console(instance_ref)238        self.assertIn('token', ajax_console)239        self.assertIn('host', ajax_console)240        self.assertIn('port', ajax_console)241    @catch_notimplementederror242    def test_get_vnc_console(self):243        instance_ref, network_info = self._get_running_instance()244        vnc_console = self.connection.get_vnc_console(instance_ref)245        self.assertIn('token', vnc_console)246        self.assertIn('host', vnc_console)247        self.assertIn('port', vnc_console)248    @catch_notimplementederror249    def test_get_console_pool_info(self):250        instance_ref, network_info = self._get_running_instance()251        console_pool = self.connection.get_console_pool_info(instance_ref)252        self.assertIn('address', console_pool)253        self.assertIn('username', console_pool)254        self.assertIn('password', console_pool)255    @catch_notimplementederror256    def test_refresh_security_group_rules(self):257        # FIXME: Create security group and add the instance to it258        instance_ref, network_info = self._get_running_instance()259        self.connection.refresh_security_group_rules(1)260    @catch_notimplementederror261    def test_refresh_security_group_members(self):262        # FIXME: Create security group and add the instance to it263        instance_ref, network_info = self._get_running_instance()264        self.connection.refresh_security_group_members(1)265    @catch_notimplementederror266    def test_refresh_provider_fw_rules(self):267        instance_ref, network_info = self._get_running_instance()268        self.connection.refresh_provider_fw_rules()269    @catch_notimplementederror270    def test_update_available_resource(self):271        self.compute = self.start_service('compute', host='dummy')272        self.connection.update_available_resource(self.ctxt, 'dummy')273    @catch_notimplementederror274    def test_compare_cpu(self):275        cpu_info = '''{ "topology": {276                               "sockets": 1,277                               "cores": 2,278                               "threads": 1 },279                        "features": [280                            "xtpr",281                            "tm2",282                            "est",283                            "vmx",284                            "ds_cpl",285                            "monitor",286                            "pbe",287                            "tm",288                            "ht",289                            "ss",290                            "acpi",291                            "ds",292                            "vme"],293                        "arch": "x86_64",294                        "model": "Penryn",295                        "vendor": "Intel" }'''296        self.connection.compare_cpu(cpu_info)297    @catch_notimplementederror298    def test_ensure_filtering_for_instance(self):299        instance_ref = test_utils.get_test_instance()300        network_info = test_utils.get_test_network_info()301        self.connection.ensure_filtering_rules_for_instance(instance_ref,302                                                            network_info)303    @catch_notimplementederror304    def test_unfilter_instance(self):305        instance_ref = test_utils.get_test_instance()306        network_info = test_utils.get_test_network_info()307        self.connection.unfilter_instance(instance_ref, network_info)308    @catch_notimplementederror309    def test_live_migration(self):310        instance_ref, network_info = self._get_running_instance()311        self.connection.live_migration(self.ctxt, instance_ref, 'otherhost',312                                       None, None)313    @catch_notimplementederror314    def _check_host_status_fields(self, host_status):315        self.assertIn('disk_total', host_status)316        self.assertIn('disk_used', host_status)317        self.assertIn('host_memory_total', host_status)318        self.assertIn('host_memory_free', host_status)319    @catch_notimplementederror320    def test_update_host_status(self):321        host_status = self.connection.update_host_status()322        self._check_host_status_fields(host_status)323    @catch_notimplementederror324    def test_get_host_stats(self):325        host_status = self.connection.get_host_stats()326        self._check_host_status_fields(host_status)327    @catch_notimplementederror328    def test_set_host_enabled(self):329        self.connection.set_host_enabled('a useless argument?', True)330    @catch_notimplementederror331    def test_host_power_action_reboot(self):332        self.connection.host_power_action('a useless argument?', 'reboot')333    @catch_notimplementederror334    def test_host_power_action_shutdown(self):335        self.connection.host_power_action('a useless argument?', 'shutdown')336    @catch_notimplementederror337    def test_host_power_action_startup(self):338        self.connection.host_power_action('a useless argument?', 'startup')339class AbstractDriverTestCase(_VirtDriverTestCase):340    def setUp(self):341        import engine.virt.driver342        self.driver_module = engine.virt.driver343        def get_driver_connection(_):344            return engine.virt.driver.ComputeDriver()345        self.driver_module.get_connection = get_driver_connection346        super(AbstractDriverTestCase, self).setUp()347class FakeConnectionTestCase(_VirtDriverTestCase):348    def setUp(self):349        import engine.virt.fake350        self.driver_module = engine.virt.fake351        super(FakeConnectionTestCase, self).setUp()352class LibvirtConnTestCase(_VirtDriverTestCase):353    def setUp(self):354        # Put fakelibvirt in place355        if 'libvirt' in sys.modules:356            self.saved_libvirt = sys.modules['libvirt']357        else:358            self.saved_libvirt = None359        import fakelibvirt360        import fake_libvirt_utils361        sys.modules['libvirt'] = fakelibvirt362        import engine.virt.libvirt.connection363        import engine.virt.libvirt.firewall364        engine.virt.libvirt.connection.libvirt = fakelibvirt365        engine.virt.libvirt.connection.libvirt_utils = fake_libvirt_utils366        engine.virt.libvirt.firewall.libvirt = fakelibvirt367        # Point _VirtDriverTestCase at the right module368        self.driver_module = engine.virt.libvirt.connection369        super(LibvirtConnTestCase, self).setUp()370        FLAGS.rescue_image_id = "2"371        FLAGS.rescue_kernel_id = "3"372        FLAGS.rescue_ramdisk_id = None373    def tearDown(self):374        super(LibvirtConnTestCase, self).setUp()375        # Restore libvirt376        import engine.virt.libvirt.connection377        import engine.virt.libvirt.firewall378        if self.saved_libvirt:379            sys.modules['libvirt'] = self.saved_libvirt380            engine.virt.libvirt.connection.libvirt = self.saved_libvirt381            engine.virt.libvirt.connection.libvirt_utils = self.saved_libvirt...

Full Screen

Full Screen

common_catch_and_log.py

Source:common_catch_and_log.py Github

copy

Full Screen

1# Copyright 2014 Google Inc. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#      http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS-IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Unit tests for logger."""15__author__ = 'Mike Gainer (mgainer@google.com)'16import datetime17import unittest18import appengine_config19from common import catch_and_log20class CatchAndLogTests(unittest.TestCase):21    def setUp(self):22        appengine_config.PRODUCTION_MODE = False23        self._catch_and_log = catch_and_log.CatchAndLog()24        self._expected = []25    def test_simple(self):26        complaint = 'No cheese, Gromit!'27        self._catch_and_log.critical(complaint)28        self._expect(complaint, catch_and_log._CRITICAL)29        self._assert_logs_match()30    def test_multiple(self):31        complaints = [32            'Failed to find cheese in pantry',33            'Failed to install cheese to mousetrap',34            'Failed to arm trap',35            'Failed to catch mouse'36            ]37        for complaint in complaints:38            self._catch_and_log.critical(complaint)39            self._expect(complaint, catch_and_log._CRITICAL)40        self._assert_logs_match()41    def test_multiple_levels(self):42        complaint = 'No cheese, Gromit!'43        self._catch_and_log.critical(complaint)44        self._expect(complaint, catch_and_log._CRITICAL)45        complaint = 'Moon out of range!'46        self._catch_and_log.warn(complaint)47        self._expect(complaint, catch_and_log._WARNING)48        complaint = 'Parking brake engaged!'49        self._catch_and_log.warning(complaint)50        self._expect(complaint, catch_and_log._WARNING)51        complaint = 'Five mice spectating'52        self._catch_and_log.info(complaint)53        self._expect(complaint, catch_and_log._INFO)54        complaint = 'Red light blinking'55        self._catch_and_log.info(complaint)56        self._expect(complaint, catch_and_log._INFO)57        complaint = 'Low blinker fluid'58        self._catch_and_log.warning(complaint)59        self._expect(complaint, catch_and_log._WARNING)60        complaint = 'Insufficient fuel for landing!'61        self._catch_and_log.critical(complaint)62        self._expect(complaint, catch_and_log._CRITICAL)63        self._assert_logs_match()64    def test_exception_suppressed(self):65        topic = 'Entering Orbit'66        complaint = 'Perigee below surface of Moon!'67        with self._catch_and_log.consume_exceptions(topic):68            raise ValueError(complaint)69        self._expect(70            '%s: ValueError: %s at   ' % (topic, complaint) +71            'File "/tests/unit/common_catch_and_log.py", line 86, in '72            'test_exception_suppressed\n    raise ValueError(complaint)\n',73            catch_and_log._CRITICAL)74        self._assert_logs_match()75    def test_exception_propagates(self):76        topic = 'Entering Orbit'77        complaint = 'Perigee below surface of Moon!'78        with self.assertRaises(ValueError):79            with self._catch_and_log.propagate_exceptions(topic):80                raise ValueError(complaint)81        self._expect(82            '%s: ValueError: %s at   ' % (topic, complaint) +83            'File "/tests/unit/common_catch_and_log.py", line 100, in '84            'test_exception_propagates\n    raise ValueError(complaint)\n',85            catch_and_log._CRITICAL)86        self._assert_logs_match()87    def test_traceback_info_suppressed_in_production(self):88        appengine_config.PRODUCTION_MODE = True89        topic = 'Entering Orbit'90        complaint = 'Perigee below surface of Moon!'91        with self._catch_and_log.consume_exceptions(topic):92            raise ValueError(complaint)93        self._expect('%s: ValueError: %s' % (topic, complaint),94                     catch_and_log._CRITICAL)95        self._assert_logs_match()96    def _expect(self, message, level):97        self._expected.append({98            'message': message,99            'level': level,100            'timestamp': datetime.datetime.now().strftime(101                catch_and_log._LOG_DATE_FORMAT)})102    def _assert_logs_match(self):103        if len(self._expected) != len(self._catch_and_log.get()):104            self.fail('Expected %d entries, but have %d' %105                      (len(self._expected), len(self._catch_and_log.get())))106        for expected, actual in zip(self._expected, self._catch_and_log.get()):107            self.assertEquals(expected['level'], actual['level'])108            self.assertEquals(expected['message'], actual['message'])109            expected_time = datetime.datetime.strptime(110                expected['timestamp'], catch_and_log._LOG_DATE_FORMAT)111            actual_time = datetime.datetime.strptime(112                actual['timestamp'], catch_and_log._LOG_DATE_FORMAT)113            self.assertAlmostEqual(...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1# coding=utf-82from __future__ import unicode_literals3from .. import Provider as CompanyProvider4class Provider(CompanyProvider):5    formats = (6        '{{last_name}} {{company_suffix}}',7        '{{last_name}} {{last_name}} {{company_suffix}}',8        '{{last_name}}',9        '{{last_name}}',10    )11    catch_phrase_formats = (12        '{{catch_phrase_noun}} {{catch_phrase_verb}} {{catch_phrase_attribute}}', )13    nouns = (14        'la sécurité',15        'le plaisir',16        'le confort',17        'la simplicité',18        "l'assurance",19        "l'art",20        'le pouvoir',21        'le droit',22        'la possibilité',23        "l'avantage",24        'la liberté')25    verbs = (26        'de rouler',27        "d'avancer",28        "d'évoluer",29        'de changer',30        "d'innover",31        'de louer',32        "d'atteindre vos buts",33        'de concrétiser vos projets')34    attributes = (35        'de manière efficace',36        'plus rapidement',37        'plus facilement',38        'plus simplement',39        'en toute tranquilité',40        'avant-tout',41        'autrement',42        'naturellement',43        'à la pointe',44        'sans soucis',45        "à l'état pur",46        'à sa source',47        'de manière sûre',48        'en toute sécurité')49    company_suffixes = ('SA', 'S.A.', 'SARL', 'S.A.R.L.', 'S.A.S.', 'et Fils')50    siren_format = "### ### ###"51    def catch_phrase_noun(self):52        """53        Returns a random catch phrase noun.54        """55        return self.random_element(self.nouns)56    def catch_phrase_attribute(self):57        """58        Returns a random catch phrase attribute.59        """60        return self.random_element(self.attributes)61    def catch_phrase_verb(self):62        """63        Returns a random catch phrase verb.64        """65        return self.random_element(self.verbs)66    def catch_phrase(self):67        """68        :example 'integrate extensible convergence'69        """70        catch_phrase = ""71        while True:72            pattern = self.random_element(self.catch_phrase_formats)73            catch_phrase = self.generator.parse(pattern)74            catch_phrase = catch_phrase[0].upper() + catch_phrase[1:]75            if self._is_catch_phrase_valid(catch_phrase):76                break77        return catch_phrase78    # An array containing string which should not appear twice in a catch phrase79    words_which_should_not_appear_twice = ('sécurité', 'simpl')80    def _is_catch_phrase_valid(self, catch_phrase):81        """82        Validates a french catch phrase.83        :param catch_phrase: The catch phrase to validate.84        """85        for word in self.words_which_should_not_appear_twice:86            # Fastest way to check if a piece of word does not appear twice.87            begin_pos = catch_phrase.find(word)88            end_pos = catch_phrase.find(word, begin_pos + 1)89            if begin_pos != -1 and begin_pos != end_pos:90                return False91        return True92    def siren(self):93        """94        Generates a siren number (9 digits).95        """96        return self.numerify(self.siren_format)97    def siret(self, max_sequential_digits=2):98        """99        Generates a siret number (14 digits).100        It is in fact the result of the concatenation of a siren number (9 digits),101        a sequential number (4 digits) and a control number (1 digit) concatenation.102        If $max_sequential_digits is invalid, it is set to 2.103        :param max_sequential_digits The maximum number of digits for the sequential number (> 0 && <= 4).104        """105        if max_sequential_digits > 4 or max_sequential_digits <= 0:106            max_sequential_digits = 2107        sequential_number = str(self.random_number(108            max_sequential_digits)).zfill(4)...

Full Screen

Full Screen

test_helpers.py

Source:test_helpers.py Github

copy

Full Screen

1#!/usr/bin/env python2# This Source Code Form is subject to the terms of the Mozilla Public3# License, v. 2.0. If a copy of the MPL was not distributed with this file,4# You can obtain one at http://mozilla.org/MPL/2.0/.5import sys6import unittest7import os8from lib.helpers import *9from mock import call, MagicMock10dirname = os.path.dirname(os.path.realpath(__file__))11sys.path.append(os.path.join(dirname, '..'))12class RetryFunc():13    def __init__(self):14        self.ran = 015        self.calls = []16        self.err = UserWarning17    def succeed(self, count):18        self.ran = self.ran + 119        self.calls.append(count)20    def fail(self, count):21        self.ran = self.ran + 122        self.calls.append(count)23        raise self.err24class TestRetryFunc(unittest.TestCase):25    def setUp(self):26        self.retry_func = RetryFunc()27        self.catch_func = RetryFunc()28        self.example_method = MagicMock()29        self.throw_exception = False30    def run_logic(self):31        self.example_method('run_logic1')32        if self.throw_exception:33            self.throw_exception = False34            raise UserWarning('error')35        self.example_method('run_logic2')36    def handler_logic(self):37        self.example_method('handler_logic')38    def exception_caught(self):39        self.example_method('exception_handler1')40        retry_func(41            lambda ran: self.handler_logic(),42            catch_func=lambda ran: self.exception_caught(),43            catch=UserWarning, retries=344        )45        self.example_method('exception_handler2')46    def test_passes_retry_count(self):47        self.assertRaises(48            self.retry_func.err,49            retry_func,50            self.retry_func.fail,51            catch=UserWarning, retries=352        )53        self.assertEqual(self.retry_func.calls, [0, 1, 2, 3])54    def test_retries_on_fail(self):55        self.assertRaises(56            self.retry_func.err,57            retry_func,58            self.retry_func.fail,59            catch=UserWarning, retries=360        )61        self.assertEqual(self.retry_func.ran, 4)62    def test_run_catch_func_on_fail(self):63        self.assertRaises(64            self.retry_func.err,65            retry_func,66            self.retry_func.fail,67            catch_func=self.catch_func.succeed,68            catch=UserWarning, retries=369        )70        self.assertEqual(self.catch_func.ran, 4)71    def test_no_retry_on_success(self):72        retry_func(73            self.retry_func.succeed,74            catch=UserWarning, retries=375        )76        self.assertEqual(self.retry_func.ran, 1)77    def test_no_run_catch_func_on_success(self):78        retry_func(79            self.retry_func.succeed,80            catch_func=self.catch_func.succeed,81            catch=UserWarning, retries=382        )83        self.assertEqual(self.catch_func.ran, 0)84    def test_runs_logic_and_handler_in_proper_order(self):85        self.throw_exception = True86        retry_func(87            lambda ran: self.run_logic(),88            catch_func=lambda ran: self.exception_caught(),89            catch=UserWarning, retries=190        )91        expected = [call('run_logic1'),92                    call('exception_handler1'),93                    call('handler_logic'),94                    call('exception_handler2'),95                    call('run_logic1'),96                    call('run_logic2')]97        self.assertEqual(expected, self.example_method.mock_calls)98if __name__ == '__main__':...

Full Screen

Full Screen

Motion.py

Source:Motion.py Github

copy

Full Screen

1from direct.fsm import StateData2from toontown.toonbase import ToontownGlobals3from direct.directnotify import DirectNotifyGlobal4from direct.fsm import ClassicFSM, State5from direct.fsm import State6import TTEmote7from otp.avatar import Emote8class Motion(StateData.StateData):9    notify = DirectNotifyGlobal.directNotify.newCategory('Motion')10    def __init__(self, toon):11        self.lt = toon12        self.doneEvent = 'motionDone'13        StateData.StateData.__init__(self, self.doneEvent)14        self.fsm = ClassicFSM.ClassicFSM('Motion', [State.State('off', self.enterOff, self.exitOff),15         State.State('neutral', self.enterNeutral, self.exitNeutral),16         State.State('walk', self.enterWalk, self.exitWalk),17         State.State('run', self.enterRun, self.exitRun),18         State.State('sad-neutral', self.enterSadNeutral, self.exitSadNeutral),19         State.State('sad-walk', self.enterSadWalk, self.exitSadWalk),20         State.State('catch-neutral', self.enterCatchNeutral, self.exitCatchNeutral),21         State.State('catch-run', self.enterCatchRun, self.exitCatchRun),22         State.State('catch-eatneutral', self.enterCatchEatNeutral, self.exitCatchEatNeutral),23         State.State('catch-eatnrun', self.enterCatchEatNRun, self.exitCatchEatNRun)], 'off', 'off')24        self.fsm.enterInitialState()25    def delete(self):26        del self.fsm27    def load(self):28        pass29    def unload(self):30        pass31    def enter(self):32        self.notify.debug('enter')33    def exit(self):34        self.fsm.requestFinalState()35    def enterOff(self, rate = 0):36        self.notify.debug('enterOff')37    def exitOff(self):38        pass39    def enterNeutral(self, rate = 0):40        self.notify.debug('enterNeutral')41    def exitNeutral(self):42        self.notify.debug('exitNeutral')43    def enterWalk(self, rate = 0):44        self.notify.debug('enterWalk')45        Emote.globalEmote.disableBody(self.lt, 'enterWalk')46    def exitWalk(self):47        self.notify.debug('exitWalk')48        Emote.globalEmote.releaseBody(self.lt, 'exitWalk')49    def enterRun(self, rate = 0):50        self.notify.debug('enterRun')51        Emote.globalEmote.disableBody(self.lt, 'enterRun')52    def exitRun(self):53        self.notify.debug('exitRun')54        Emote.globalEmote.releaseBody(self.lt, 'exitRun')55    def enterSadNeutral(self, rate = 0):56        self.notify.debug('enterSadNeutral')57    def exitSadNeutral(self):58        self.notify.debug('exitSadNeutral')59    def enterSadWalk(self, rate = 0):60        self.notify.debug('enterSadWalk')61    def exitSadWalk(self):62        pass63    def enterCatchNeutral(self, rate = 0):64        self.notify.debug('enterCatchNeutral')65    def exitCatchNeutral(self):66        self.notify.debug('exitCatchNeutral')67    def enterCatchRun(self, rate = 0):68        self.notify.debug('enterCatchRun')69    def exitCatchRun(self):70        self.notify.debug('exitCatchRun')71    def enterCatchEatNeutral(self, rate = 0):72        self.notify.debug('enterCatchEatNeutral')73    def exitCatchEatNeutral(self):74        self.notify.debug('exitCatchEatNeutral')75    def enterCatchEatNRun(self, rate = 0):76        self.notify.debug('enterCatchEatNRun')77    def exitCatchEatNRun(self):78        self.notify.debug('exitCatchEatNRun')79    def setState(self, anim, rate):80        toon = self.lt81        if toon.playingAnim != anim:...

Full Screen

Full Screen

runner.py

Source:runner.py Github

copy

Full Screen

1#!/usr/bin/python -tt2#3# Copyright (c) 2011 Intel, Inc.4#5# This program is free software; you can redistribute it and/or modify it6# under the terms of the GNU General Public License as published by the Free7# Software Foundation; version 2 of the License8#9# This program is distributed in the hope that it will be useful, but10# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY11# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License12# for more details.13#14# You should have received a copy of the GNU General Public License along15# with this program; if not, write to the Free Software Foundation, Inc., 5916# Temple Place - Suite 330, Boston, MA 02111-1307, USA.17import os18import subprocess19from mic import msger20def runtool(cmdln_or_args, catch=1):21    """ wrapper for most of the subprocess calls22    input:23        cmdln_or_args: can be both args and cmdln str (shell=True)24        catch: 0, quitely run25               1, only STDOUT26               2, only STDERR27               3, both STDOUT and STDERR28    return:29        (rc, output)30        if catch==0: the output will always None31    """32    if catch not in (0, 1, 2, 3):33        # invalid catch selection, will cause exception, that's good34        return None35    if isinstance(cmdln_or_args, list):36        cmd = cmdln_or_args[0]37        shell = False38    else:39        import shlex40        cmd = shlex.split(cmdln_or_args)[0]41        shell = True42    if catch != 3:43        dev_null = os.open("/dev/null", os.O_WRONLY)44    if catch == 0:45        sout = dev_null46        serr = dev_null47    elif catch == 1:48        sout = subprocess.PIPE49        serr = dev_null50    elif catch == 2:51        sout = dev_null52        serr = subprocess.PIPE53    elif catch == 3:54        sout = subprocess.PIPE55        serr = subprocess.STDOUT56    try:57        p = subprocess.Popen(cmdln_or_args, stdout=sout,58                             stderr=serr, shell=shell)59        (sout, serr) = p.communicate()60        # combine stdout and stderr, filter None out61        out = ''.join(filter(None, [sout, serr]))62    except OSError, e:63        if e.errno == 2:64            # [Errno 2] No such file or directory65            msger.error('Cannot run command: %s, lost dependency?' % cmd)66        else:67            raise # relay68    finally:69        if catch != 3:70            os.close(dev_null)71    return (p.returncode, out)72def show(cmdln_or_args):73    # show all the message using msger.verbose74    rc, out = runtool(cmdln_or_args, catch=3)75    if isinstance(cmdln_or_args, list):76        cmd = ' '.join(cmdln_or_args)77    else:78        cmd = cmdln_or_args79    msg =  'running command: "%s"' % cmd80    if out: out = out.strip()81    if out:82        msg += ', with output::'83        msg += '\n  +----------------'84        for line in out.splitlines():85            msg += '\n  | %s' % line86        msg += '\n  +----------------'87    msger.verbose(msg)88    return rc89def outs(cmdln_or_args, catch=1):90    # get the outputs of tools91    return runtool(cmdln_or_args, catch)[1].strip()92def quiet(cmdln_or_args):...

Full Screen

Full Screen

location.py

Source:location.py Github

copy

Full Screen

1#_*_ coding=utf-8 _*_2from selenium import webdriver3from selenium.webdriver.support.ui import Select4from selenium.webdriver.common.action_chains import ActionChains5from poseidon.base import CommonBase as cb6'''7简易封装定位单个元素和一组元素的方法8'''9"""定位单个元素"""10@cb.com_try_catch11def findId(driver, id):12    f = driver.find_element_by_id(id)13    return f14@cb.com_try_catch15def findName(driver, name):16    f = driver.find_element_by_name(name)17    return f18@cb.com_try_catch19def findClassName(driver, name):20    f = driver.find_element_by_class_name(name)21    return f22@cb.com_try_catch23def findTagName(driver, name):24    f = driver.find_element_by_tag_name(name)25    return f26@cb.com_try_catch27def findLinkText(driver, text):28    f = driver.find_element_by_link_text(text)29    return f30@cb.com_try_catch31def findPLinkText(driver, text):32    f = driver.find_element_by_partial_link_text(text)33    return f34@cb.com_try_catch35def findXpath(driver, xpath):36    f = driver.find_element_by_xpath(xpath)37    return f38@cb.com_try_catch39def findCss(driver, css):40    f = driver.find_element_by_css_selector(css)41    return f42'''定位一组元素'''43@cb.com_try_catch44def findsId(driver, id):45    f = driver.find_elements_by_id(id)46    return f47@cb.com_try_catch48def findsName(driver, name):49    f = driver.find_elements_by_name(name)50    return f51@cb.com_try_catch52def findsClassName(driver, name):53    f = driver.find_elements_by_class_name(name)54    return f55@cb.com_try_catch56def findsTagName(driver, name):57    f = driver.find_elements_by_tag_name(name)58    return f59@cb.com_try_catch60def findsLinkText(driver, text):61    f = driver.find_elements_by_link_text(text)62    return f63@cb.com_try_catch64def findsPLinkText(driver, text):65    f = driver.find_elements_by_partial_link_text(text)66    return f67@cb.com_try_catch68def findsXpath(driver, xpath):69    f = driver.find_elements_by_xpath(xpath)70    return f71@cb.com_try_catch72def findsCss(driver, css):73    f = driver.find_elements_by_css_selector(css)74    return f75def findDropdown(driver, xpath, index=0, tag_name="li"):76    """77    :param driver:78    :param xpath: 下拉列表的xpath(或其他定位)定位79    :return:80    """81    element = findXpath(driver, xpath)82    if element.tag_name.lower() != "select":83        findXpath(element, tag_name + "[" + str(index) + "]").click()84    else:85        Select(findsXpath(driver, element)).select_by_index(index)86    # aa = findsXpath(temp, "li")87    # print "success is %s" % len(aa)88def clikUnClickableElement(driver, xpath):89    f = findXpath(driver, xpath)...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1const { Given, When, Then } = require('cucumber');2const assert = require('assert');3const { Builder, By, Key, until } = require('selenium-webdriver');4const { expect } = require('chai');5const { Given, When, Then } = require('cucumber');6const { defineSupportCode } = require('cucumber');7const { Builder, By, Key, until } = require('selenium-webdriver');8const { expect } = require('chai');9const { Given, When, Then } = require('cucumber');10const { defineSupportCode } = require('cucumber');11const { Builder, By, Key, until } = require('selenium-webdriver');12const { expect } = require('chai');13const { Given, When, Then } = require('cucumber');14const { defineSupportCode } = require('cucumber');15const { Builder, By, Key, until } = require('selenium-webdriver');16const { expect } = require('chai');17defineSupportCode(function({ Given, When, Then }) {18    Given('I am on the Google search page', function () {19    });20    When('I search for {stringInDoubleQuotes}', function (searchString) {21        return this.driver.findElement(By.name('q')).sendKeys(searchString, Key.RETURN);22    });23    Then('the page title should start with {stringInDoubleQuotes}', function (expectedTitle) {24        return this.driver.getTitle().then(function (title) {25            expect(title).to.equal(expectedTitle);26        });27    });28});29defineSupportCode(function({ Given, When, Then }) {30    Given('I am on the Google search page', function () {31    });32    When('I search for {stringInDoubleQuotes}', function (searchString) {33        return this.driver.findElement(By.name('q')).sendKeys(searchString, Key.RETURN);34    });35    Then('the page title should start with {stringInDoubleQuotes}', function (expectedTitle) {36        return this.driver.getTitle().then(function (title) {37            expect(title).to.equal(expectedTitle);38        });39    });40});41defineSupportCode(function({ Given, When, Then }) {42    Given('I am on the Google search page', function () {43    });

Full Screen

Using AI Code Generation

copy

Full Screen

1var {defineSupportCode} = require('cucumber');2var {setDefaultTimeout} = require('cucumber');3setDefaultTimeout(60 * 1000);4defineSupportCode(function({Given, When, Then}) {5    Given(/^I go to the "([^"]*)" page$/, function (arg1, callback) {6        callback(null, 'pending');7    });8    When(/^I click on "([^"]*)"$/, function (arg1, callback) {9        callback(null, 'pending');10    });11    Then(/^I should see the "([^"]*)" page$/, function (arg1, callback) {12        callback(null, 'pending');13    });14});15public void acceptAlert() {16    Alert alert = driver.switchTo().alert();17    alert.accept();18}19public void dismissAlert() {20    Alert alert = driver.switchTo().alert();21    alert.dismiss();22}23public String getAlertText() {24    Alert alert = driver.switchTo().alert();25    return alert.getText();26}27public void sendKeysToAlert(String keys) {28    Alert alert = driver.switchTo().alert();29    alert.sendKeys(keys);30}31Before(function() {32    this.acceptAlert();33});34var {

Full Screen

Using AI Code Generation

copy

Full Screen

1const { Given, When, Then, After, Before } = require('cucumber');2const { expect } = require('chai');3const { Builder, By, Key, until } = require('selenium-webdriver');4const chrome = require('selenium-webdriver/chrome');5const path = require('chromedriver').path;6const options = new chrome.Options();7options.addArguments('headless');8options.addArguments('disable-gpu');9var driver = new Builder()10  .forBrowser('chrome')11  .setChromeOptions(options)12  .build();13Given('I am on the Google search page', async function () {14});15When('I search for {string}', async function (string) {16  await driver.findElement(By.name('q')).sendKeys(string, Key.RETURN);17});18Then('the page title should start with {string}', async function (string) {19  await driver.wait(until.titleIs(string), 1000);20});21After(async function () {22  await driver.quit();23});24Before(async function () {25  await driver.manage().window().setRect({ width: 1366, height: 768 });26});27{28  "scripts": {29  },30  "dependencies": {31  }32}33module.exports = {34  default: `--format-options '{"snippetInterface": "synchronous"}'`35};36const { Given, When, Then, After, Before } = require('cucumber');37const { expect } = require('chai');38const { Builder, By, Key, until } = require('selenium-webdriver

Full Screen

Using AI Code Generation

copy

Full Screen

1var {defineSupportCode} = require('cucumber');2defineSupportCode(function({Given, When, Then}) {3  Given(/^I have entered (\d+) into the calculator$/, function (arg1) {4    return 'pending';5  });6  When(/^I press add$/, function () {7    return 'pending';8  });9  Then(/^the result should be (\d+) on the screen$/, function (arg1) {10    return 'pending';11  });12});13var {defineSupportCode} = require('cucumber');14defineSupportCode(function({Given, When, Then}) {15  Given(/^I have entered (\d+) into the calculator$/, function (arg1) {16    return 'pending';17  });18  When(/^I press add$/, function () {19    return 'pending';20  });21  Then(/^the result should be (\d+) on the screen$/, function (arg1) {22    return 'pending';23  });24});25var {defineSupportCode} = require('cucumber');26defineSupportCode(function({Given, When, Then}) {27  Given(/^I have entered (\d+) into the calculator$/, function (arg1) {28    return 'pending';29  });30  When(/^I press add$/, function () {31    return 'pending';32  });33  Then(/^the result should be (\d+) on the screen$/, function (arg1

Full Screen

Using AI Code Generation

copy

Full Screen

1import { Given, When, Then } from 'cucumber';2import { expect } from 'chai';3import { browser, element, by } from 'protractor';4Given('I am on the Google page', async () => {5});6When('I type {string} in the search field', async (string) => {7    await element(by.name('q')).sendKeys(string);8});9Then('I should see {string} in the title', async (string) => {10    await expect(browser.getTitle()).to.eventually.equal(string);11});12Then('I should see {string} in the results', async (string) => {13    await expect(element(by.css('#result-stats')).getText()).to.eventually.contain(string);14});15import { Given, When, Then } from 'cucumber';16import { expect } from 'chai';17import { browser, element, by } from 'protractor';18Given('I am on the Google page', async () => {19});20When('I type {string} in the search field', async (string) => {21    await element(by.name('q')).sendKeys(string);22});23Then('I should see {string} in the title', async (string) => {24    await expect(browser.getTitle()).to.eventually.equal(string);25});26Then('I should see {string} in the results', async (string) => {27    await expect(element(by.css('#result-stats')).getText()).to.eventually.contain(string);28});

Full Screen

Using AI Code Generation

copy

Full Screen

1import { Given, When, Then } from 'cucumber';2import { expect } from 'chai';3Given('I am on the homepage', function() {4  browser.pause(2000);5});6When('I enter a search item', function() {7  browser.setValue('#lst-ib', 'Cucumber');8  browser.pause(2000);9});10Then('I should see the search results', function() {11  browser.click('#tsbb');12  browser.pause(2000);13  expect(browser.getTitle()).to.equal('Cucumber - Google Search');14});15exports.config = {16  capabilities: [{17  }],18  cucumberOpts: {

Full Screen

Cucumber Tutorial:

LambdaTest offers a detailed Cucumber testing tutorial, explaining its features, importance, best practices, and more to help you get started with running your automation testing scripts.

Cucumber Tutorial Chapters:

Here are the detailed Cucumber testing chapters to help you get started:

  • Importance of Cucumber - Learn why Cucumber is important in Selenium automation testing during the development phase to identify bugs and errors.
  • Setting Up Cucumber in Eclipse and IntelliJ - Learn how to set up Cucumber in Eclipse and IntelliJ.
  • Running First Cucumber.js Test Script - After successfully setting up your Cucumber in Eclipse or IntelliJ, this chapter will help you get started with Selenium Cucumber testing in no time.
  • Annotations in Cucumber - To handle multiple feature files and the multiple scenarios in each file, you need to use functionality to execute these scenarios. This chapter will help you learn about a handful of Cucumber annotations ranging from tags, Cucumber hooks, and more to ease the maintenance of the framework.
  • Automation Testing With Cucumber And Nightwatch JS - Learn how to build a robust BDD framework setup for performing Selenium automation testing by integrating Cucumber into the Nightwatch.js framework.
  • Automation Testing With Selenium, Cucumber & TestNG - Learn how to perform Selenium automation testing by integrating Cucumber with the TestNG framework.
  • Integrate Cucumber With Jenkins - By using Cucumber with Jenkins integration, you can schedule test case executions remotely and take advantage of the benefits of Jenkins. Learn how to integrate Cucumber with Jenkins with this detailed chapter.
  • Cucumber Best Practices For Selenium Automation - Take a deep dive into the advanced use cases, such as creating a feature file, separating feature files, and more for Cucumber testing.

Run Cucumber-gherkin automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful