Best Python code snippet using pytest-play_python
test_queues.py
Source:test_queues.py  
...36async def deactivate_test(client):37    await remove_all_test_queues(client)38    await client.close()39async def remove_all_test_queues(client):40    await client.execute_command('eval', """41            local keys = unpack(redis.call("keys", ARGV[1]))42            if keys then43                return redis.call("del", keys)44            end45        """, 0, QUEUE_NAME + '*')46@pytest.mark.asyncio47async def test_add_items():48    client, queue_instance = await init_test()49    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:50        items = ['first-message', 'second-message']51        await queue_instance.add_items(items)52        assert items[0] == await client.execute_command('rpop', QUEUE_NAME)53        assert items[1] == await client.execute_command('rpop', QUEUE_NAME)54        assert await client.execute_command('rpop', QUEUE_NAME) is None55        assert 1 == slaves_mock.call_count56    await deactivate_test(client)57@pytest.mark.asyncio58async def test_add_item():59    client, queue_instance = await init_test()60    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:61        for i in [3, 5, 3, 1]:62            await queue_instance.add_item(i)63        assert ['1', '3', '5', '3'] == await client.execute_command('lrange', QUEUE_NAME, 0, 5)64        assert 4 == slaves_mock.call_count65    await deactivate_test(client)66@pytest.mark.asyncio67async def test_get_items():68    client, queue_instance = await init_test()69    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:70        for i in [3, 5, 3, 1]:71            await client.execute_command('lpush', QUEUE_NAME, i)72        assert ['3', '5', '3'] == await queue_instance.get_items(3)73        assert ['1'] == await queue_instance.get_items(1)74        assert [] == await queue_instance.get_items(1)75        await client.execute_command('del', queue_instance.processing_queue_name)76        await client.execute_command('del', queue_instance.timeouts_hash_name)77        assert 0 == slaves_mock.call_count78    await deactivate_test(client)79@pytest.mark.asyncio80async def test_delete_item():81    client, queue_instance = await init_test()82    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:83        for i in [3, 5, 6, 3, 1, 4]:84            await client.execute_command('lpush', QUEUE_NAME, i)85        await queue_instance.delete_item(3)86        assert ['4', '1', '6', '5'] == await client.execute_command('lrange', QUEUE_NAME, 0, -1)87        assert 1 == slaves_mock.call_count88    await deactivate_test(client)89@pytest.mark.asyncio90async def test_delete_items():91    client, queue_instance = await init_test()92    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:93        for i in [3, 5, 6, 3, 1, 4]:94            await client.execute_command('lpush', QUEUE_NAME, i)95        await queue_instance.delete_items([1, 3, 4, 5])96        assert ['6'] == await client.execute_command('lrange', QUEUE_NAME, 0, -1)97        assert 1 == slaves_mock.call_count98    await deactivate_test(client)99@pytest.mark.asyncio100async def test_delete_item_one_occurrence():101    client, queue_instance = await init_test()102    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:103        for i in [3, 5, 6, 3, 1, 4]:104            await client.execute_command('lpush', QUEUE_NAME, i)105        await queue_instance.delete_item(3, 1)106        assert ['4', '1', '6', '5', '3'] == await client.execute_command('lrange', QUEUE_NAME, 0, -1)107        assert 1 == slaves_mock.call_count108    await deactivate_test(client)109@pytest.mark.asyncio110async def test_delete_items_with_several_occurrences():111    client, queue_instance = await init_test()112    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:113        for i in [3, 5, 6, 3, 1, 4, 4, 4]:114            await client.execute_command('lpush', QUEUE_NAME, i)115        await queue_instance.delete_items([1, 3, 4, 5], 1)116        assert ['4', '4', '6', '3'] == await client.execute_command('lrange', QUEUE_NAME, 0, -1)117        assert 1 == slaves_mock.call_count118    await deactivate_test(client)119@pytest.mark.asyncio120async def test_ack_item():121    client, queue_instance = await init_test()122    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:123        await client.execute_command('lpush', queue_instance.processing_queue_name, *[1, 5, 5, 3])124        saved_time = int(time.time())125        await client.execute_command('hset', queue_instance.timeouts_hash_name, queue_instance.processing_queue_name, saved_time)126        for i in [1, 5, 1]:127            await queue_instance.ack_item(i)128        assert ['3', '5'] == await client.execute_command('lrange', queue_instance.processing_queue_name, 0, 5)129        assert {queue_instance.processing_queue_name: str(saved_time)} == await client.execute_command(130            'hgetall', queue_instance.timeouts_hash_name)131        for i in [5, 3]:132            await queue_instance.ack_item(i)133        assert 0 == await client.execute_command('llen', queue_instance.processing_queue_name)134        assert 5 == slaves_mock.call_count135    await deactivate_test(client)136@pytest.mark.asyncio137async def test_ack_items():138    client, queue_instance = await init_test()139    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:140        await client.execute_command('lpush', queue_instance.processing_queue_name, *[1, 5, 5, 3, 6, 7])141        saved_time = int(time.time())142        await client.execute_command('hset', queue_instance.timeouts_hash_name, queue_instance.processing_queue_name, saved_time)143        await queue_instance.ack_items([1, 5])144        await queue_instance.ack_items([1])145        assert ['7', '6', '3', '5'] == await client.execute_command('lrange', queue_instance.processing_queue_name, 0, 5)146        assert {queue_instance.processing_queue_name: str(saved_time)} == await client.execute_command(147            'hgetall', queue_instance.timeouts_hash_name)148        await queue_instance.ack_items([5, 3, 6])149        await queue_instance.ack_items([7])150        assert 0 == await client.execute_command('llen', queue_instance.processing_queue_name)151        assert 4, slaves_mock.call_count152    await deactivate_test(client)153@pytest.mark.asyncio154async def test_reject_item():155    client, queue_instance = await init_test()156    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:157        await client.execute_command('lpush', queue_instance.processing_queue_name, *[1, 5, 5, 3])158        saved_time = int(time.time())159        await client.execute_command('hset', queue_instance.timeouts_hash_name, queue_instance.processing_queue_name, saved_time)160        await queue_instance.reject_item(1)161        await queue_instance.reject_item(5)162        await queue_instance.reject_item(1)163        assert ['1', '5'] == await client.execute_command('lrange', QUEUE_NAME, 0, 5)164        assert ['3', '5'] == await client.execute_command('lrange', queue_instance.processing_queue_name, 0, 5)165        assert {queue_instance.processing_queue_name: str(saved_time)} == await client.execute_command(166            'hgetall', queue_instance.timeouts_hash_name)167        await queue_instance.reject_item(3)168        await queue_instance.reject_item(5)169        assert ['1', '5', '3', '5'] == await client.execute_command('lrange', QUEUE_NAME, 0, 5)170        assert 0 == await client.execute_command('llen', queue_instance.processing_queue_name)171        assert 5 == slaves_mock.call_count172    await deactivate_test(client)173@pytest.mark.asyncio174async def test_reject_items():175    client, queue_instance = await init_test()176    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:177        await client.execute_command('lpush', queue_instance.processing_queue_name, *[1, 5, 5, 3, 6, 7])178        saved_time = int(time.time())179        await client.execute_command('hset', queue_instance.timeouts_hash_name, queue_instance.processing_queue_name, saved_time)180        await queue_instance.reject_items([1, 5])181        await queue_instance.reject_items([5])182        await queue_instance.reject_items([9])183        184        assert ['5', '1', '5'] == await client.execute_command('lrange', QUEUE_NAME, 0, 5)185        assert ['7', '6', '3'] == await client.execute_command('lrange', queue_instance.processing_queue_name, 0, 5)186        assert {queue_instance.processing_queue_name: str(saved_time)} == await client.execute_command(187            'hgetall', queue_instance.timeouts_hash_name)188        await queue_instance.reject_items([3, 6, 7])189        assert ['5', '1', '5', '7', '6', '3'] == await client.execute_command('lrange', QUEUE_NAME, 0, 10)190        assert 0 == await client.execute_command('llen', queue_instance.processing_queue_name)191        assert 4 == slaves_mock.call_count192    await deactivate_test(client)193@pytest.mark.asyncio194async def test_integration():195    client, queue_instance = await init_test()196    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:197        await queue_instance.add_items([1, 5, 2, 6, 7])198        assert ['1', '5', '2', '6', '7'] == await queue_instance.get_items(5)199        assert [] == await queue_instance.get_items(1)200        await queue_instance.ack_items([1, 5])201        assert [] == await queue_instance.get_items(1)202        await queue_instance.reject_items([2, 6, 7])203        assert ['2', '6', '7'] == await queue_instance.get_items(5)204        await queue_instance.ack_items([2, 6, 7])205        assert 0 == await client.execute_command('llen', QUEUE_NAME)206        assert 4 == slaves_mock.call_count207    await deactivate_test(client)208@pytest.mark.asyncio209async def test_re_enqueue_timeout_items():210    client, queue_instance = await init_test()211    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:212        microtimestamp = time.time()213        timestamp = int(microtimestamp)214        processing_queue1 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 15)215        await client.execute_command('lpush', processing_queue1, 1, 5, 3)216        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue1, microtimestamp - 15)217        processing_queue2 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 10)218        await client.execute_command('lpush', processing_queue2, 1, 4, 6)219        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue2, microtimestamp - 10)220        processing_queue3 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 5)221        await client.execute_command('lpush', processing_queue3, 4, 7, 8)222        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue3, microtimestamp - 5)223        await queue_instance.re_enqueue_timeout_items(7)224        assert ['6', '4', '1', '3', '5', '1'] == await client.execute_command('lrange', QUEUE_NAME, 0, 10)225        assert ['8', '7', '4'] == await client.execute_command('lrange', processing_queue3, 0, 5)226        assert {processing_queue3: str(microtimestamp - 5)} == await client.execute_command('hgetall', TIMEOUT_QUEUE)227        assert [QUEUE_NAME, processing_queue3, TIMEOUT_QUEUE] == sorted(await client.execute_command('keys', QUEUE_NAME + '*'))228        await queue_instance.re_enqueue_timeout_items(0)229        assert ['6', '4', '1', '3', '5', '1', '8', '7', '4'] == await client.execute_command('lrange', QUEUE_NAME, 0, 10)230        assert [QUEUE_NAME] == await client.execute_command('keys', QUEUE_NAME + '*')231        assert 2 == slaves_mock.call_count232    await deactivate_test(client)233@pytest.mark.asyncio234async def test_re_enqueue_all_times():235    client, queue_instance = await init_test()236    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:237        microtimestamp = time.time()238        timestamp = int(microtimestamp)239        processing_queue1 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 15)240        await client.execute_command('lpush', processing_queue1, 1, 5, 3)241        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue1, microtimestamp - 15)242        processing_queue2 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 10)243        await client.execute_command('lpush', processing_queue2, 1, 4, 6)244        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue2, microtimestamp - 10)245        processing_queue3 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 5)246        await client.execute_command('lpush', processing_queue3, 4, 7, 8)247        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue3, microtimestamp - 5)248        await queue_instance.re_enqueue_all_items()249        assert ['8', '7', '4', '6', '4', '1', '3', '5', '1'] == await client.execute_command('lrange', QUEUE_NAME, 0, 10)250        assert [QUEUE_NAME] == await client.execute_command('keys', QUEUE_NAME + '*')251        assert 1 == slaves_mock.call_count252    await deactivate_test(client)253@pytest.mark.asyncio254async def test_drop_timeout_items():255    client, queue_instance = await init_test()256    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:257        microtimestamp = time.time()258        timestamp = int(microtimestamp)259        processing_queue1 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 15)260        await client.execute_command('lpush', processing_queue1, 1, 5, 3)261        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue1, microtimestamp - 15)262        processing_queue2 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 10)263        await client.execute_command('lpush', processing_queue2, 1, 4, 6)264        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue2, microtimestamp - 10)265        processing_queue3 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 5)266        await client.execute_command('lpush', processing_queue3, 4, 7, 8)267        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue3, microtimestamp - 5)268        await queue_instance.drop_timeout_items(7)269        assert [] == await client.execute_command('lrange', QUEUE_NAME, 0, 5)270        assert ['8', '7', '4'] == await client.execute_command('lrange', processing_queue3, 0, 5)271        assert {processing_queue3: str(microtimestamp - 5)} == await client.execute_command('hgetall', TIMEOUT_QUEUE)272        assert [processing_queue3, TIMEOUT_QUEUE] == sorted(await client.execute_command('keys', QUEUE_NAME + '*'))273        await queue_instance.drop_timeout_items(0)274        assert [] == await client.execute_command('lrange', QUEUE_NAME, 0, 10)275        assert [] == await client.execute_command('keys', QUEUE_NAME + '*')276        assert 2 == slaves_mock.call_count277    await deactivate_test(client)278@pytest.mark.asyncio279async def test_drop_all_items():280    client, queue_instance = await init_test()281    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:282        microtimestamp = time.time()283        timestamp = int(microtimestamp)284        processing_queue1 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 15)285        await client.execute_command('lpush', processing_queue1, 1, 5, 3)286        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue1, microtimestamp - 15)287        processing_queue2 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 10)288        await client.execute_command('lpush', processing_queue2, 1, 4, 6)289        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue2, microtimestamp - 10)290        processing_queue3 = PROCESSING_QUEUE_SCHEMA.format(socket.gethostname(), os.getpid(), timestamp - 5)291        await client.execute_command('lpush', processing_queue3, 4, 7, 8)292        await client.execute_command('hset', TIMEOUT_QUEUE, processing_queue3, microtimestamp - 5)293        await queue_instance.drop_all_items()294        assert [] == await client.execute_command('lrange', QUEUE_NAME, 0, 10)295        assert [] == await client.execute_command('keys', QUEUE_NAME + '*')296        assert 1 == slaves_mock.call_count297    await deactivate_test(client)298@pytest.mark.asyncio299async def test_rollback_timeout(patch_time):300    max_retry = 3301    max_minutes = 10302    client, queue_instance = await init_test(max_retry_rollback=max_retry, max_timeout_in_queue=max_minutes)303    with patch('aiopyrq.helpers.wait_for_synced_slaves') as slaves_mock:304        items = [1,2,3,4]305        await client.execute_command('lpush', queue_instance.processing_queue_name, *items)306        await queue_instance.drop_all_items()307        unrolledback_items = []308        time_start = int(time.time()/60)309        time_now = int(time.time()/60)310        rollback_counter = 0311        for _ in range(100):  # add one for possible inconsistencies in timing312            for item in items[1:]:313                can_rollback = await queue_instance.can_rollback_item(item)314                if can_rollback:315                    rollback_counter += 1316                    # a reject would happen here in normal situation317                else:318                    unrolledback_items.append(item)319            if len(set(unrolledback_items)) == 3:...test_module.py
Source:test_module.py  
...5import __builtin__6import math7class MyTestCase(ModuleTestCase('redis-tsdb-module.so')):8    def _get_ts_info(self, redis, key):9        info = redis.execute_command('TS.INFO', key)10        return dict([(info[i], info[i+1]) for i in range(0, len(info), 2)])11    @staticmethod12    def _insert_data(redis, key, start_ts, samples_count, value):13        """14        insert data to key, starting from start_ts, with 1 sec interval between them15        :param redis: redis connection16        :param key: name of time_series17        :param start_ts: beginning of time series18        :param samples_count: number of samples19        :param value: could be a list of samples_count values, or one value. if a list, insert the values in their20        order, if not, insert the single value for all the timestamps21        """22        for i in range(samples_count):23            value_to_insert = value[i] if type(value) == list else value24            assert redis.execute_command('TS.ADD', key, start_ts + i, value_to_insert)25    def _insert_agg_data(self, redis, key, agg_type):26        agg_key = '%s_agg_%s_10' % (key, agg_type)27        assert redis.execute_command('TS.CREATE', key)28        assert redis.execute_command('TS.CREATE', agg_key)29        assert redis.execute_command('TS.CREATERULE', key, agg_type, 10, agg_key)30        values = (31, 41, 59, 26, 53, 58, 97, 93, 23, 84)31        for i in range(10, 50):32            assert redis.execute_command('TS.ADD', key, i, i // 10 * 100 + values[i % 10])33        return agg_key34    @staticmethod35    def _get_series_value(ts_key_result):36        """37        Get only the values from the time stamp series38        :param ts_key_result: the output of ts.range command (pairs of timestamp and value)39        :return: float values of all the values in the series40        """41        return [float(value[1]) for value in ts_key_result]42    @staticmethod43    def _calc_downsampling_series(values, bucket_size, calc_func):44        """45        calculate the downsampling series given the wanted calc_func, by applying to calc_func to all the full buckets46        and to the remainder bucket47        :param values: values of original series48        :param bucket_size: bucket size for downsampling49        :param calc_func: function that calculates the wanted rule, for example min/sum/avg50        :return: the values of the series after downsampling51        """52        series = []53        for i in range(1, int(math.ceil(len(values) / float(bucket_size)))):54            curr_bucket_size = bucket_size55            # we don't have enough values for a full bucket anymore56            if (i + 1) * bucket_size > len(values):57                curr_bucket_size = len(values) - i58            series.append(calc_func(values[i * bucket_size: i * bucket_size + curr_bucket_size]))59        return series60    def calc_rule(self, rule, values, bucket_size):61        """62        Calculate the downsampling with the given rule63        :param rule: 'avg' / 'max' / 'min' / 'sum' / 'count'64        :param values: original series values65        :param bucket_size: bucket size for downsampling66        :return: the values of the series after downsampling67        """68        if rule == 'avg':69            return self._calc_downsampling_series(values, bucket_size, lambda x: float(sum(x)) / len(x))70        elif rule in ['sum', 'max', 'min']:71            return self._calc_downsampling_series(values, bucket_size, getattr(__builtin__, rule))72        elif rule == 'count':73            return self._calc_downsampling_series(values, bucket_size, len)74    def test_sanity(self):75        start_ts = 1511885909L76        samples_count = 50077        with self.redis() as r:78            assert r.execute_command('TS.CREATE', 'tester')79            self._insert_data(r, 'tester', start_ts, samples_count, 5)80            expected_result = [[start_ts+i, str(5)] for i in range(samples_count)]81            actual_result = r.execute_command('TS.range', 'tester', start_ts, start_ts + samples_count)82            assert expected_result == actual_result83    def test_rdb(self):84        start_ts = 1511885909L85        samples_count = 50086        data = None87        with self.redis() as r:88            assert r.execute_command('TS.CREATE', 'tester')89            assert r.execute_command('TS.CREATE', 'tester_agg_avg_10')90            assert r.execute_command('TS.CREATE', 'tester_agg_max_10')91            assert r.execute_command('TS.CREATERULE', 'tester', 'AVG', 10, 'tester_agg_avg_10')92            assert r.execute_command('TS.CREATERULE', 'tester', 'MAX', 10, 'tester_agg_max_10')93            self._insert_data(r, 'tester', start_ts, samples_count, 5)94            data = r.execute_command('dump', 'tester')95        with self.redis() as r:96            r.execute_command('RESTORE', 'tester', 0, data)97            expected_result = [[start_ts+i, str(5)] for i in range(samples_count)]98            actual_result = r.execute_command('TS.range', 'tester', start_ts, start_ts + samples_count)99            assert expected_result == actual_result100    def test_rdb_aggregation_context(self):101        """102        Check that the aggregation context of the rules is saved in rdb. Write data with not a full bucket,103        then save it and restore, add more data to the bucket and check the rules results considered the previous data104        that was in that bucket in their calculation. Check on avg and min, since all the other rules use the same105        context as min.106        """107        start_ts = 3108        samples_count = 4  # 1 full bucket and another one with 1 value109        with self.redis() as r:110            assert r.execute_command('TS.CREATE', 'tester')111            assert r.execute_command('TS.CREATE', 'tester_agg_avg_3')112            assert r.execute_command('TS.CREATE', 'tester_agg_min_3')113            assert r.execute_command('TS.CREATERULE', 'tester', 'AVG', 3, 'tester_agg_avg_3')114            assert r.execute_command('TS.CREATERULE', 'tester', 'MIN', 3, 'tester_agg_min_3')115            self._insert_data(r, 'tester', start_ts, samples_count, range(samples_count))116            data_tester = r.execute_command('dump', 'tester')117            data_avg_tester = r.execute_command('dump', 'tester_agg_avg_3')118            data_min_tester = r.execute_command('dump', 'tester_agg_min_3')119        with self.redis() as r:120            r.execute_command('RESTORE', 'tester', 0, data_tester)121            r.execute_command('RESTORE', 'tester_agg_avg_3', 0, data_avg_tester)122            r.execute_command('RESTORE', 'tester_agg_min_3', 0, data_min_tester)123            assert r.execute_command('TS.ADD', 'tester', start_ts + samples_count, samples_count)124            # if the aggregation context wasn't saved, the results were considering only the new value added125            expected_result_avg = [[start_ts, '1'], [start_ts + 3, '3.5']]126            expected_result_min = [[start_ts, '0'], [start_ts + 3, '3']]127            actual_result_avg = r.execute_command('TS.range', 'tester_agg_avg_3', start_ts, start_ts + samples_count)128            assert actual_result_avg == expected_result_avg129            actual_result_min = r.execute_command('TS.range', 'tester_agg_min_3', start_ts, start_ts + samples_count)130            assert actual_result_min == expected_result_min131    def test_sanity_pipeline(self):132        start_ts = 1488823384L133        samples_count = 500134        with self.redis() as r:135            assert r.execute_command('TS.CREATE', 'tester')136            with r.pipeline(transaction=False) as p:137                p.set("name", "danni")138                self._insert_data(p, 'tester', start_ts, samples_count, 5)139                p.execute()140            expected_result = [[start_ts+i, str(5)] for i in range(samples_count)]141            actual_result = r.execute_command('TS.range', 'tester', start_ts, start_ts + samples_count)142            assert expected_result == actual_result143    def test_range_query(self):144        start_ts = 1488823384L145        samples_count = 500146        with self.redis() as r:147            assert r.execute_command('TS.CREATE', 'tester')148            self._insert_data(r, 'tester', start_ts, samples_count, 5)149            expected_result = [[start_ts+i, str(5)] for i in range(100, 151)]150            actual_result = r.execute_command('TS.range', 'tester', start_ts+100, start_ts + 150)151            assert expected_result == actual_result152    def test_range_with_agg_query(self):153        start_ts = 1488823384L154        samples_count = 500155        with self.redis() as r:156            assert r.execute_command('TS.CREATE', 'tester')157            self._insert_data(r, 'tester', start_ts, samples_count, 5)158            expected_result = [[1488823000L, '116'], [1488823500L, '384']]159            actual_result = r.execute_command('TS.range', 'tester', start_ts, start_ts + 500, 'count', 500)160            assert expected_result == actual_result161    def test_compaction_rules(self):162        with self.redis() as r:163            assert r.execute_command('TS.CREATE', 'tester')164            assert r.execute_command('TS.CREATE', 'tester_agg_max_10')165            assert r.execute_command('TS.CREATERULE', 'tester', 'avg', 10, 'tester_agg_max_10')166            start_ts = 1488823384L167            samples_count = 500168            self._insert_data(r, 'tester', start_ts, samples_count, 5)169            actual_result = r.execute_command('TS.RANGE', 'tester_agg_max_10', start_ts, start_ts + samples_count)170            assert len(actual_result) == samples_count/10171            info_dict = self._get_ts_info(r, 'tester')172            assert info_dict == {'chunkCount': 2L, 'lastTimestamp': start_ts + samples_count -1, 'maxSamplesPerChunk': 360L, 'retentionSecs': 0L, 'rules': [['tester_agg_max_10', 10L, 'AVG']]}173    174    def test_create_compaction_rule_without_dest_series(self):175        with self.redis() as r:176            assert r.execute_command('TS.CREATE', 'tester')177            with pytest.raises(redis.ResponseError) as excinfo:178                assert r.execute_command('TS.CREATERULE', 'tester', 'MAX', 10, 'tester_agg_max_10')179    def test_create_compaction_rule_twice(self):180        with self.redis() as r:181            assert r.execute_command('TS.CREATE', 'tester')182            assert r.execute_command('TS.CREATE', 'tester_agg_max_10')183            assert r.execute_command('TS.CREATERULE', 'tester', 'MAX', 10, 'tester_agg_max_10')184            with pytest.raises(redis.ResponseError) as excinfo:185                assert r.execute_command('TS.CREATERULE', 'tester', 'MAX', 10, 'tester_agg_max_10')186    def test_create_compaction_rule_and_del_dest_series(self):187        with self.redis() as r:188            assert r.execute_command('TS.CREATE', 'tester')189            assert r.execute_command('TS.CREATE', 'tester_agg_max_10')190            assert r.execute_command('TS.CREATERULE', 'tester', 'AVG', 10, 'tester_agg_max_10')191            assert r.delete('tester_agg_max_10')192            start_ts = 1488823384L193            samples_count = 500194            self._insert_data(r, 'tester', start_ts, samples_count, 5)195    def test_delete_rule(self):196        with self.redis() as r:197            assert r.execute_command('TS.CREATE', 'tester')198            assert r.execute_command('TS.CREATE', 'tester_agg_max_10')199            assert r.execute_command('TS.CREATERULE', 'tester', 'AVG', 10, 'tester_agg_max_10')200            with pytest.raises(redis.ResponseError) as excinfo:201                assert r.execute_command('TS.DELETERULE', 'tester', 'non_existent')202            assert len(self._get_ts_info(r, 'tester')['rules']) == 1203            assert r.execute_command('TS.DELETERULE', 'tester', 'tester_agg_max_10')204            assert len(self._get_ts_info(r, 'tester')['rules']) == 0205    def test_empty_series(self):206        with self.redis() as r:207            assert r.execute_command('TS.CREATE', 'tester')208            assert r.execute_command('DUMP', 'tester')209    def test_incrby_reset(self):210        with self.redis() as r:211            r.execute_command('ts.create', 'tester')212            i = 0213            time_bucket = 10214            start_time = int(time.time())215            start_time = start_time - start_time % time_bucket216            while i < 1000:217                i += 1218                r.execute_command('ts.incrby', 'tester', '1', 'RESET', time_bucket)219            assert r.execute_command('TS.RANGE', 'tester', 0, int(time.time())) == [[start_time, '1000']]220    def test_incrby(self):221        with self.redis() as r:222            r.execute_command('ts.create', 'tester')223            start_incr_time = int(time.time())224            for i in range(20):225                r.execute_command('ts.incrby', 'tester', '5')226            time.sleep(1)227            start_decr_time = int(time.time())228            for i in range(20):229                r.execute_command('ts.decrby', 'tester', '1')230            assert r.execute_command('TS.RANGE', 'tester', 0, int(time.time())) == [[start_incr_time, '100'], [start_decr_time, '80']]231    def test_agg_min(self):232        with self.redis() as r:233            agg_key = self._insert_agg_data(r, 'tester', 'min')234            expected_result = [[10, '123'], [20, '223'], [30, '323'], [40, '423']]235            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)236            assert expected_result == actual_result237    def test_agg_max(self):238        with self.redis() as r:239            agg_key = self._insert_agg_data(r, 'tester', 'max')240            expected_result = [[10, '197'], [20, '297'], [30, '397'], [40, '497']]241            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)242            assert expected_result == actual_result243    def test_agg_avg(self):244        with self.redis() as r:245            agg_key = self._insert_agg_data(r, 'tester', 'avg')246            expected_result = [[10, '156.5'], [20, '256.5'], [30, '356.5'], [40, '456.5']]247            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)248            assert expected_result == actual_result249    def test_agg_sum(self):250        with self.redis() as r:251            agg_key = self._insert_agg_data(r, 'tester', 'sum')252            expected_result = [[10, '1565'], [20, '2565'], [30, '3565'], [40, '4565']]253            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)254            assert expected_result == actual_result255    def test_agg_count(self):256        with self.redis() as r:257            agg_key = self._insert_agg_data(r, 'tester', 'count')258            expected_result = [[10, '10'], [20, '10'], [30, '10'], [40, '10']]259            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)260            assert expected_result == actual_result261    def test_agg_first(self):262        with self.redis() as r:263            agg_key = self._insert_agg_data(r, 'tester', 'first')264            expected_result = [[10, '131'], [20, '231'], [30, '331'], [40, '431']]265            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)266            assert expected_result == actual_result267    def test_agg_last(self):268        with self.redis() as r:269            agg_key = self._insert_agg_data(r, 'tester', 'last')270            expected_result = [[10, '184'], [20, '284'], [30, '384'], [40, '484']]271            actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)272            assert expected_result == actual_result273    def test_downsampling_rules(self):274        """275        Test downsmapling rules - avg,min,max,count,sum with 4 keys each.276        Downsample in resolution of:277        1sec (should be the same length as the original series),278        3sec (number of samples is divisible by 10),279        10s (number of samples is not divisible by 10),280        1000sec (series should be empty since there are not enough samples)281        Insert some data and check that the length, the values and the info of the downsample series are as expected.282        """283        with self.redis() as r:284            assert r.execute_command('TS.CREATE', 'tester')285            rules = ['avg', 'sum', 'count', 'max', 'min']286            resolutions = [1, 3, 10, 1000]287            for rule in rules:288                for resolution in resolutions:289                    assert r.execute_command('TS.CREATE', 'tester_{}_{}'.format(rule, resolution))290                    assert r.execute_command('TS.CREATERULE', 'tester', rule, resolution,291                                             'tester_{}_{}'.format(rule, resolution))292            start_ts = 0293            samples_count = 501294            end_ts = start_ts + samples_count295            values = range(samples_count)296            self._insert_data(r, 'tester', start_ts, samples_count, values)297            for rule in rules:298                for resolution in resolutions:299                    actual_result = r.execute_command('TS.RANGE', 'tester_{}_{}'.format(rule, resolution),300                                                      start_ts, end_ts)301                    assert len(actual_result) == math.ceil((samples_count - resolution) / float(resolution))302                    expected_result = self.calc_rule(rule, values, resolution)303                    assert self._get_series_value(actual_result) == expected_result304                    # last time stamp should be the beginning of the last bucket305                    assert self._get_ts_info(r, 'tester_{}_{}'.format(rule, resolution))['lastTimestamp'] == \...http_method.py
Source:http_method.py  
1#coding=utf-82import sys3sys.path.append('../')4import json5from wish import wish_methods6from joom import joom_methods7from urllib.parse import unquote8from amazon import interface_products9from amazon import interface_sellers10from amazon import interface_orders11from amazon import interface_recommendations12from amazon import interface_finances13from amazon import interface_fulfillmentInventory14from amazon import interface_fulfillmentOutboundShipment15from amazon import interface_feeds16import requests17# from wish import interface_wish_order18# from wish import interface_wish_faq19# from wish import interface_wish_product20# from wish import interface_wish_ticket21# from wish import interface_wish_notifications22# from joom import interface_joom_order23def amazon_execute_method_product(request):24    print('/amazon_execute/product is recieved a post request')25    data = request.request.body26    # print(data)27    data = data.decode('utf-8').split('&')28    execute_command = {}29    for item in data:30        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])31    method = eval('interface_products.interface_products.'+execute_command['method'])32    # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å33    print(execute_command)34    return_data = method(execute_command)35    #å¤ç请æ±åæ°ï¼36    result = return_data37    return result38def amazon_execute_method_order(request):39    print('/amazon_execute/order is recieved a post request')40    data = request.request.body41    # print(data)42    data = data.decode('utf-8').split('&')43    execute_command = {}44    for item in data:45        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])46    method = eval('interface_orders.interface_orders.'+execute_command['method'])47    # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å48    print(execute_command)49    return_data = method(execute_command)50    #å¤ç请æ±åæ°ï¼51    result = return_data52    return result53    54def amazon_execute_method_seller(request):55    print('/amazon_execute/seller is recieved a post request')56    data = request.request.body57    # print(data)58    data = data.decode('utf-8').split('&')59    execute_command = {}60    for item in data:61        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])62    method = eval('interface_sellers.interface_sellers.'+execute_command['method'])63    # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å64    print(execute_command)65    return_data = method(execute_command)66    #å¤ç请æ±åæ°67    result = return_data68    return result69def amazon_execute_method_fulfillment_inbound_shipment(request):70    print('/amazon_execute/fulfillment_inbound_shipment is recieved a post request')71    data = request.request.body72    # print(data)73    data = data.decode('utf-8').split('&')74    execute_command = {}75    for item in data:76        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])77    method = eval('interface_fulfillment_inbound_shipment.interface_fulfillment_inbound_shipment.'+execute_command['method'])78    # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å79    print(execute_command)80    return_data = method(execute_command)81    #å¤ç请æ±åæ°82    result = return_data83    return result84#å®ç°åºå   宿æ¸
å85def amazon_execute_method_fulfillment_inventory(request):86    print('/amazon_execute/fulfillmentInventory is recieved a post request')87    data = request.request.body88    # print(data)89    data = data.decode('utf-8').split('&')90    execute_command = {}91    for item in data:92        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])93    method = eval('interface_fulfillmentInventory.interface_fulfillmentInventory.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å94    print(execute_command)95    return_data = method(execute_command)  # å¤ç请æ±åæ°96    result = return_data97    return result98#é
éåºåº99def amazon_execute_method_fulfillment_outbound_shipment(request):100    print('/amazon_execute/fulfillmentOutboundShipment is recieved a post request')101    data = request.request.body102    # print(data)103    data = data.decode('utf-8').split('&')104    execute_command = {}105    for item in data:106        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])107    method = eval('interface_fulfillmentOutboundShipment.interface_fulfillmentOutboundShipment.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å108    print(execute_command)109    return_data = method(execute_command)  # å¤ç请æ±åæ°110    result = return_data111    return result112#èµé113def amazon_execute_method_finances(request):114    print('/amazon_execute/finances is recieved a post request')115    data = request.request.body116    print(data)117    data = data.decode('utf-8').split('&')118    execute_command = {}119    for item in data:120        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])121    method = eval('interface_finances.interface_finances.'+execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å122    print(execute_command)123    return_data = method(execute_command)   #å¤ç请æ±åæ°124    result = return_data125    return result126def amazon_execute_method_recommendation(request):127    print('/amazon_execute/recommendation is recieved a post request')128    data = request.request.body129    print(data)130    data = data.decode('utf-8').split('&')131    execute_command = {}132    for item in data:133        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])134    method = eval('interface_recommendations.interface_recommendations.'+execute_command['method'])135    # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å136    print(execute_command)137    return_data = method(execute_command)138    #å¤ç请æ±åæ°ï¼139    result = return_data140    return result141#æ¥å142def amazon_execute_method_reports(request):143    print('/amazon_execute/reports is recieved a post request')144    data = request.request.body145    print(data)146    data = data.decode('utf-8').split('&')147    execute_command = {}148    for item in data:149        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])150    method = eval('interface_reports.interface_reports.'+execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å151    print(execute_command)152    return_data = method(execute_command)   #å¤ç请æ±åæ°153    result = return_data154    return result155def amazon_execute_method_merchant_fulfillment(request):156    print('/amazon_execute/merchant_fulfillment is recieved a post request')157    data = request.request.body158    print(data)159    data = data.decode('utf-8').split('&')160    execute_command = {}161    for item in data:162        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])163    method = eval('interface_merchant_fulfillment.interface_merchant_fulfillment.'+execute_command['method'])164    # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å165    print(execute_command)166    return_data = method(execute_command)167    #å¤ç请æ±åæ°ï¼168    result = return_data169    return result170#订é
171def amazon_execute_method_subscriptions(request):172    print('/amazon_execute/subscriptions is recieved a post request')173    data = request.request.body      # ä¼ å
¥çåæ°é174    print(data)175    data = data.decode('utf-8').split('&')176    execute_command = {}177    for item in data:178        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])179    method = eval('interface_subscriptions.interface_subscriptions.'+execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å180    print(execute_command)181    return_data = method(execute_command)   #å¤ç请æ±åæ°182    result = return_data183    return result184def amazon_execute_method_feed(request):185    print('/amazon_execute/feed is recieved a post request')186    data = request.request.body      # ä¼ å
¥çåæ°é187    print(data)188    data = data.decode('utf-8').split('&')189    execute_command = {}190    for item in data:191        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])192    method = eval('interface_feeds.interface_feeds.'+execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å193    print(execute_command)194    return_data = method(execute_command)   #å¤ç请æ±åæ°195    result = return_data196    return result197#================================================================198#WISH199def wish_execute_order(request):200    print('/wish/orders is recieved a get request')201    data = request.request.body  # ä¼ å
¥çåæ°é202    print(data)203    data = data.decode('utf-8').split('&')204    execute_command = {}205    for item in data:206        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])207    method = eval('interface_wish_order.Order.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å208    print(execute_command)209    return_data = method(execute_command)  # å¤ç请æ±åæ°210    result = return_data211    return result212def wish_execute_faq(request):213    data = request.request.body  # ä¼ å
¥çåæ°é214    print(data)215    data = data.decode('utf-8').split('&')216    execute_command = {}217    for item in data:218        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])219    method = eval('interface_wish_faq.Faq.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å220    print(execute_command)221    return_data = method(execute_command)  # å¤ç请æ±åæ°222    result = return_data223    return result224def wish_execute_product(request):225    data = request.request.body  # ä¼ å
¥çåæ°é226    print(data)227    data = data.decode('utf-8').split('&')228    execute_command = {}229    for item in data:230        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])231    method = eval('interface_wish_product.Product.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å232    print(execute_command)233    return_data = method(execute_command)  # å¤ç请æ±åæ°234    result = return_data235    return result236def wish_execute_ticket(request):237    data = request.request.body  # ä¼ å
¥çåæ°é238    print(data)239    data = data.decode('utf-8').split('&')240    execute_command = {}241    for item in data:242        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])243    method = eval('interface_wish_ticket.Ticket.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å244    print(execute_command)245    return_data = method(execute_command)  # å¤ç请æ±åæ°246    result = return_data247    return result248def wish_execute_notifications(request):249    data = request.request.body  # ä¼ å
¥çåæ°é250    print(data)251    data = data.decode('utf-8').split('&')252    execute_command = {}253    for item in data:254        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])255    method = eval('interface_wish_notifications.Notifications.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å256    print(execute_command)257    return_data = method(execute_command)  # å¤ç请æ±åæ°258    result = return_data259    return result260#joom261def joom_execute_order(request):262    print('/wish/orders is recieved a get request')263    data = request.request.body  # ä¼ å
¥çåæ°é264    print(data)265    data = data.decode('utf-8').split('&')266    execute_command = {}267    for item in data:268        execute_command[unquote(item.split('=')[0])] = unquote(item.split('=')[1])269    method = eval('interface_joom_order.Order.' + execute_command['method'])  # ä¼ å
¥çjsonåç¬¦ä¸²ä¸æmethodè¿ä¸ªé®ï¼éè¿evalç´æ¥å¯»æ¾å¯¹åºé®å¼çæ¹æ³å270    print(execute_command)271    return_data = method(execute_command)  # å¤ç请æ±åæ°272    result = return_data...switch.py
Source:switch.py  
...30    is_on: Callable[[Callable[[str], OverkizStateType]], bool] | None = None31SWITCH_DESCRIPTIONS: list[OverkizSwitchDescription] = [32    OverkizSwitchDescription(33        key=UIWidget.DOMESTIC_HOT_WATER_TANK,34        turn_on=lambda execute_command: execute_command(35            OverkizCommand.SET_FORCE_HEATING, OverkizCommandParam.ON36        ),37        turn_off=lambda execute_command: execute_command(38            OverkizCommand.SET_FORCE_HEATING, OverkizCommandParam.OFF39        ),40        is_on=lambda select_state: (41            select_state(OverkizState.IO_FORCE_HEATING) == OverkizCommandParam.ON42        ),43        icon="mdi:water-boiler",44    ),45    OverkizSwitchDescription(46        key=UIClass.ON_OFF,47        turn_on=lambda execute_command: execute_command(OverkizCommand.ON),48        turn_off=lambda execute_command: execute_command(OverkizCommand.OFF),49        is_on=lambda select_state: (50            select_state(OverkizState.CORE_ON_OFF) == OverkizCommandParam.ON51        ),52        device_class=SwitchDeviceClass.OUTLET,53    ),54    OverkizSwitchDescription(55        key=UIClass.SWIMMING_POOL,56        turn_on=lambda execute_command: execute_command(OverkizCommand.ON),57        turn_off=lambda execute_command: execute_command(OverkizCommand.OFF),58        is_on=lambda select_state: (59            select_state(OverkizState.CORE_ON_OFF) == OverkizCommandParam.ON60        ),61        icon="mdi:pool",62    ),63    OverkizSwitchDescription(64        key=UIWidget.RTD_INDOOR_SIREN,65        turn_on=lambda execute_command: execute_command(OverkizCommand.ON),66        turn_off=lambda execute_command: execute_command(OverkizCommand.OFF),67        icon="mdi:bell",68    ),69    OverkizSwitchDescription(70        key=UIWidget.RTD_OUTDOOR_SIREN,71        turn_on=lambda execute_command: execute_command(OverkizCommand.ON),72        turn_off=lambda execute_command: execute_command(OverkizCommand.OFF),73        icon="mdi:bell",74    ),75    OverkizSwitchDescription(76        key=UIWidget.STATELESS_ALARM_CONTROLLER,77        turn_on=lambda execute_command: execute_command(OverkizCommand.ALARM_ON),78        turn_off=lambda execute_command: execute_command(OverkizCommand.ALARM_OFF),79        icon="mdi:shield-lock",80    ),81    OverkizSwitchDescription(82        key=UIWidget.STATELESS_EXTERIOR_HEATING,83        turn_on=lambda execute_command: execute_command(OverkizCommand.ON),84        turn_off=lambda execute_command: execute_command(OverkizCommand.OFF),85        icon="mdi:radiator",86    ),87    OverkizSwitchDescription(88        key=UIWidget.MY_FOX_SECURITY_CAMERA,89        name="Camera Shutter",90        turn_on=lambda execute_command: execute_command(OverkizCommand.OPEN),91        turn_off=lambda execute_command: execute_command(OverkizCommand.CLOSE),92        icon="mdi:camera-lock",93        is_on=lambda select_state: (94            select_state(OverkizState.MYFOX_SHUTTER_STATUS)95            == OverkizCommandParam.OPENED96        ),97        entity_category=EntityCategory.CONFIG,98    ),99]100SUPPORTED_DEVICES = {101    description.key: description for description in SWITCH_DESCRIPTIONS102}103async def async_setup_entry(104    hass: HomeAssistant,105    entry: ConfigEntry,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
