Best Python code snippet using localstack_python
clean_owners.py
Source:clean_owners.py  
1from collections import defaultdict2from copy import copy3from functools import partial4from datetime import datetime5from mobile_endpoint.case.models import CommCareCase6from mobile_endpoint.models import OwnershipCleanlinessFlag7from mobile_endpoint.restore.cleanliness import get_case_footprint_info8from mobile_endpoint.restore.data_providers.case.load_testing import append_update_to_response9from mobile_endpoint.restore.data_providers.case.utils import get_case_sync_updates, CaseStub10from mobile_endpoint.synclog.models import SimplifiedSyncLog, LOG_FORMAT_SIMPLIFIED, IndexTree11def get_owner_id(case):12    return case.owner_id if case.owner_id is not None else case.user_id13def get_case_payload(restore_state):14    sync_op = CleanOwnerCaseSyncOperation(restore_state)15    return sync_op.get_payload()16chunk_size = 100017class CleanOwnerCaseSyncOperation(object):18    def __init__(self, restore_state):19        self.restore_state = restore_state20        self.dao = restore_state.dao21        self._cleanliness_flags = None22    @property23    def cleanliness_flags(self):24        if self._cleanliness_flags is None:25            self._cleanliness_flags = dict(26                OwnershipCleanlinessFlag.query.with_entities(27                    OwnershipCleanlinessFlag.owner_id,28                    OwnershipCleanlinessFlag.is_clean29                ).filter(30                    OwnershipCleanlinessFlag.domain == self.restore_state.domain,31                    OwnershipCleanlinessFlag.owner_id.in_(self.restore_state.owner_ids),32                )33            )34        return self._cleanliness_flags35    def is_clean(self, owner_id):36        return self.cleanliness_flags.get(owner_id, False)37    def get_payload(self):38        response = self.restore_state.restore_class()39        case_ids_to_sync = set()40        for owner_id in self.restore_state.owner_ids:41            case_ids_to_sync = case_ids_to_sync | set(self.get_case_ids_for_owner(owner_id))42        if (not self.restore_state.is_initial and43                any([not self.is_clean(owner_id) for owner_id in self.restore_state.owner_ids])):44            # if it's a steady state sync and we have any dirty owners, then we also need to45            # include ALL cases on the phone that have been modified since the last sync as46            # possible candidates to sync (since they may have been closed or reassigned by someone else)47            # don't bother checking ones we've already decided to check48            other_ids_to_check = self.restore_state.last_sync_log.case_ids_on_phone - case_ids_to_sync49            case_ids_to_sync = case_ids_to_sync | set(filter_cases_modified_since(50                self.dao, self.restore_state.domain, list(other_ids_to_check), self.restore_state.last_sync_log.date51            ))52        all_syncing = copy(case_ids_to_sync)53        all_indices = defaultdict(set)54        all_dependencies_syncing = set()55        while case_ids_to_sync:56            ids = pop_ids(case_ids_to_sync, chunk_size)57            case_batch = filter(58                partial(case_needs_to_sync, last_sync_log=self.restore_state.last_sync_log),59                self.dao.get_cases(ids)60            )61            updates = get_case_sync_updates(62                self.restore_state.domain, case_batch, self.restore_state.last_sync_log63            )64            for update in updates:65                case = update.case66                append_update_to_response(response, update, self.restore_state)67                # update the indices in the new sync log68                if case.indices:69                    all_indices[case.id] = {index.identifier: index.referenced_id for index in case.indices}70                    # and double check footprint for non-live cases71                    for index in case.indices:72                        if index.referenced_id not in all_syncing:73                            case_ids_to_sync.add(index.referenced_id)74                if not _is_live(case, self.restore_state):75                    all_dependencies_syncing.add(case.id)76            # commtrack ledger sections for this batch77            # TODO prototype: support stock78            # commtrack_elements = get_stock_payload(79            #     self.restore_state.project, self.restore_state.stock_settings,80            #     [CaseStub(update.case.id, update.case.type) for update in updates]81            # )82            # response.extend(commtrack_elements)83            # add any new values to all_syncing84            all_syncing = all_syncing | case_ids_to_sync85        # update sync token - marking it as the new format86        self.restore_state.current_sync_log = SimplifiedSyncLog.wrap(87            self.restore_state.current_sync_log.to_json()88        )89        self.restore_state.current_sync_log.log_format = LOG_FORMAT_SIMPLIFIED90        index_tree = IndexTree(indices=all_indices)91        case_ids_on_phone = all_syncing92        primary_cases_syncing = all_syncing - all_dependencies_syncing93        if not self.restore_state.is_initial:94            case_ids_on_phone = case_ids_on_phone | self.restore_state.last_sync_log.case_ids_on_phone95            # subtract primary cases from dependencies since they must be newly primary96            all_dependencies_syncing = all_dependencies_syncing | (97                self.restore_state.last_sync_log.dependent_case_ids_on_phone -98                primary_cases_syncing99            )100            index_tree = self.restore_state.last_sync_log.index_tree.apply_updates(index_tree)101        self.restore_state.current_sync_log.case_ids_on_phone = case_ids_on_phone102        self.restore_state.current_sync_log.dependent_case_ids_on_phone = all_dependencies_syncing103        self.restore_state.current_sync_log.index_tree = index_tree104        return response105    def get_case_ids_for_owner(self, owner_id):106        if self.is_clean(owner_id):107            if self.restore_state.is_initial:108                # for a clean owner's initial sync the base set is just the open ids109                return set(self.dao.get_open_case_ids(self.restore_state.domain, owner_id))110            else:111                # for a clean owner's steady state sync, the base set is anything modified since last sync112                return set(self.dao.get_case_ids_modified_with_owner_since(113                    self.restore_state.domain, owner_id, self.restore_state.last_sync_log.date114                ))115        else:116            # right now just return the whole footprint and do any filtering later117            return get_case_footprint_info(self.dao, self.restore_state.domain, owner_id).all_ids118def _is_live(case, restore_state):119    """120    Given a case and a restore state object, return whether or not the case is "live"121    (direclty owned by this sync and open), or "dependent" (needed by another case)122    """123    return not case.closed and get_owner_id(case) in restore_state.owner_ids124def filter_cases_modified_since(dao, domain, case_ids, reference_date):125    """126    Given a domain, case_ids, and a reference date, filter the case ids to only those127    that have been modified since that reference date.128    """129    last_modified_date_dict = dao.get_last_modified_dates(domain, case_ids)130    for case_id in case_ids:131        if last_modified_date_dict.get(case_id, datetime(1900, 1, 1)) > reference_date:132            yield case_id133def case_needs_to_sync(case, last_sync_log):134    owner_id = case.owner_id or case.user_id  # need to fallback to user_id for v1 cases135    if not last_sync_log or owner_id not in last_sync_log.owner_ids_on_phone:136        # initial sync or new owner IDs always sync down everything137        return True138    elif case.server_modified_on >= last_sync_log.date:139        # check all of the actions since last sync for one that had a different sync token140        return any(filter(141            lambda action: action.server_date > last_sync_log.date and action.sync_log_id != last_sync_log.id,142            case.actions,143        ))144    # if the case wasn't touched since last sync, and the phone was aware of this owner_id last time145    # don't worry about it146    return False147def pop_ids(set_, how_many):148    result = []149    for i in range(how_many):150        try:151            result.append(set_.pop())152        except KeyError:153            pass...test_restore_state.py
Source:test_restore_state.py  
1"""The tests for the Restore component."""2import asyncio3from datetime import timedelta4from unittest.mock import patch, MagicMock5from homeassistant.setup import setup_component6from homeassistant.const import EVENT_HOMEASSISTANT_START7from homeassistant.core import CoreState, split_entity_id, State8import homeassistant.util.dt as dt_util9from homeassistant.components import input_boolean, recorder10from homeassistant.helpers.restore_state import (11    async_get_last_state, DATA_RESTORE_CACHE)12from homeassistant.components.recorder.models import RecorderRuns, States13from tests.common import (14    get_test_home_assistant, mock_coro, init_recorder_component,15    mock_component)16@asyncio.coroutine17def test_caching_data(hass):18    """Test that we cache data."""19    mock_component(hass, 'recorder')20    hass.state = CoreState.starting21    states = [22        State('input_boolean.b0', 'on'),23        State('input_boolean.b1', 'on'),24        State('input_boolean.b2', 'on'),25    ]26    with patch('homeassistant.helpers.restore_state.last_recorder_run',27               return_value=MagicMock(end=dt_util.utcnow())), \28            patch('homeassistant.helpers.restore_state.get_states',29                  return_value=states), \30            patch('homeassistant.helpers.restore_state.wait_connection_ready',31                  return_value=mock_coro(True)):32        state = yield from async_get_last_state(hass, 'input_boolean.b1')33    assert DATA_RESTORE_CACHE in hass.data34    assert hass.data[DATA_RESTORE_CACHE] == {st.entity_id: st for st in states}35    assert state is not None36    assert state.entity_id == 'input_boolean.b1'37    assert state.state == 'on'38    hass.bus.async_fire(EVENT_HOMEASSISTANT_START)39    yield from hass.async_block_till_done()40    assert DATA_RESTORE_CACHE not in hass.data41@asyncio.coroutine42def test_hass_running(hass):43    """Test that cache cannot be accessed while hass is running."""44    mock_component(hass, 'recorder')45    states = [46        State('input_boolean.b0', 'on'),47        State('input_boolean.b1', 'on'),48        State('input_boolean.b2', 'on'),49    ]50    with patch('homeassistant.helpers.restore_state.last_recorder_run',51               return_value=MagicMock(end=dt_util.utcnow())), \52            patch('homeassistant.helpers.restore_state.get_states',53                  return_value=states), \54            patch('homeassistant.helpers.restore_state.wait_connection_ready',55                  return_value=mock_coro(True)):56        state = yield from async_get_last_state(hass, 'input_boolean.b1')57    assert state is None58@asyncio.coroutine59def test_not_connected(hass):60    """Test that cache cannot be accessed if db connection times out."""61    mock_component(hass, 'recorder')62    hass.state = CoreState.starting63    states = [State('input_boolean.b1', 'on')]64    with patch('homeassistant.helpers.restore_state.last_recorder_run',65               return_value=MagicMock(end=dt_util.utcnow())), \66            patch('homeassistant.helpers.restore_state.get_states',67                  return_value=states), \68            patch('homeassistant.helpers.restore_state.wait_connection_ready',69                  return_value=mock_coro(False)):70        state = yield from async_get_last_state(hass, 'input_boolean.b1')71    assert state is None72@asyncio.coroutine73def test_no_last_run_found(hass):74    """Test that cache cannot be accessed if no last run found."""75    mock_component(hass, 'recorder')76    hass.state = CoreState.starting77    states = [State('input_boolean.b1', 'on')]78    with patch('homeassistant.helpers.restore_state.last_recorder_run',79               return_value=None), \80            patch('homeassistant.helpers.restore_state.get_states',81                  return_value=states), \82            patch('homeassistant.helpers.restore_state.wait_connection_ready',83                  return_value=mock_coro(True)):84        state = yield from async_get_last_state(hass, 'input_boolean.b1')85    assert state is None86@asyncio.coroutine87def test_cache_timeout(hass):88    """Test that cache timeout returns none."""89    mock_component(hass, 'recorder')90    hass.state = CoreState.starting91    states = [State('input_boolean.b1', 'on')]92    @asyncio.coroutine93    def timeout_coro():94        raise asyncio.TimeoutError()95    with patch('homeassistant.helpers.restore_state.last_recorder_run',96               return_value=MagicMock(end=dt_util.utcnow())), \97            patch('homeassistant.helpers.restore_state.get_states',98                  return_value=states), \99            patch('homeassistant.helpers.restore_state.wait_connection_ready',100                  return_value=timeout_coro()):101        state = yield from async_get_last_state(hass, 'input_boolean.b1')102    assert state is None103def _add_data_in_last_run(hass, entities):104    """Add test data in the last recorder_run."""105    # pylint: disable=protected-access106    t_now = dt_util.utcnow() - timedelta(minutes=10)107    t_min_1 = t_now - timedelta(minutes=20)108    t_min_2 = t_now - timedelta(minutes=30)109    with recorder.session_scope(hass=hass) as session:110        session.add(RecorderRuns(111            start=t_min_2,112            end=t_now,113            created=t_min_2114        ))115        for entity_id, state in entities.items():116            session.add(States(117                entity_id=entity_id,118                domain=split_entity_id(entity_id)[0],119                state=state,120                attributes='{}',121                last_changed=t_min_1,122                last_updated=t_min_1,123                created=t_min_1))124def test_filling_the_cache():125    """Test filling the cache from the DB."""126    test_entity_id1 = 'input_boolean.b1'127    test_entity_id2 = 'input_boolean.b2'128    hass = get_test_home_assistant()129    hass.state = CoreState.starting130    init_recorder_component(hass)131    _add_data_in_last_run(hass, {132        test_entity_id1: 'on',133        test_entity_id2: 'off',134    })135    hass.block_till_done()136    setup_component(hass, input_boolean.DOMAIN, {137        input_boolean.DOMAIN: {138            'b1': None,139            'b2': None,140        }})141    hass.start()142    state = hass.states.get('input_boolean.b1')143    assert state144    assert state.state == 'on'145    state = hass.states.get('input_boolean.b2')146    assert state147    assert state.state == 'off'...persistent_widgets.py
Source:persistent_widgets.py  
2class PersistentCheckBox(QCheckBox):3    def __init__(self, name, changed=None, *args, **kwargs):4        super().__init__(*args, **kwargs)5        self.name = name6        self.restore_state()7        if changed:8            self.stateChanged.connect(changed)9        self.stateChanged.connect(lambda: QSettings().setValue(self.name, self.checkState()))10    11    def restore_state(self):12        prev_state = QSettings().value(self.name, 0)13        if prev_state == int(Qt.Checked):14            self.setCheckState(Qt.Checked)15        elif prev_state == int(Qt.PartiallyChecked):16            self.setCheckState(Qt.PartiallyChecked)17class PersistentLineEdit(QLineEdit):18    def __init__(self, name, *args, default='', changed=None, **kwargs):19        super().__init__(*args, **kwargs)20        self.name = name21        self.default = default22        self.restore_state()23        if changed:24            self.textChanged.connect(changed)25        self.textChanged.connect(lambda: QSettings().setValue(self.name, self.text()))26    27    def restore_state(self):28        self.setText(str(QSettings().value(self.name, self.default)))29class PersistentTextEdit(QTextEdit):30    def __init__(self, name, *args, default='', changed=None, **kwargs):31        super().__init__(*args, **kwargs)32        self.name = name33        self.default = default34        self.restore_state()35        if changed:36            self.textChanged.connect(changed)37        self.textChanged.connect(lambda: QSettings().setValue(self.name, self.toPlainText()))38    39    def restore_state(self):40        self.setText(str(QSettings().value(self.name, self.default)))41class PersistentListWidget(QListWidget):42    def __init__(self, name, items=[], default=[], changed=None, *args, **kwargs):43        super().__init__(*args, **kwargs)44        self.name = name45        self.default_selection = default46        if items:47            self.addItems(items)48            self.restore_state()49                50        if changed:51            self.itemSelectionChanged.connect(changed)52        self.itemSelectionChanged.connect(lambda: QSettings().setValue(self.name, self.selected_items()))53    def selected_items(self):54        return [item.text() for item in self.selectedItems()]55    def restore_state(self):56        prev_items = QSettings().value(self.name, self.default_selection)57        if prev_items:58            for i in range(self.count()):59                if self.item(i).text() in prev_items:60                    self.item(i).setSelected(True)61class PersistentTreeWidget(QTreeWidget):62    def __init__(self, name, items=[], index_column=0, default=[], changed=None, *args, **kwargs):63        super().__init__(*args, **kwargs)64        self.name = name65        self.default_selection = default66        self.index_column = index_column67        if items:68            self.addItems(items)69            self.restore_state()70                71        if changed:72            self.itemSelectionChanged.connect(changed)73        self.itemSelectionChanged.connect(lambda: QSettings().setValue(self.name, self.selected_items()))74    def selected_items(self):75        return [item.text(self.index_column) for item in self.selectedItems()]76    def restore_state(self):77        prev_items = QSettings().value(self.name, self.default_selection)78        if prev_items:79            for i in range(self.count()):80                if self.item(i).text(self.index_column) in prev_items:81                    self.item(i).setSelected(True)82class PersistentComboBox(QComboBox):83    def __init__(self, name, items=[], changed=None, *args, **kwargs):84        super().__init__(*args, **kwargs)85        self.name = name86        if items:87            self.addItems(items)88            self.restore_state()89        if changed:90            self.currentTextChanged.connect(changed)91        self.currentTextChanged.connect(lambda: QSettings().setValue(self.name, self.currentIndex()))92    def restore_state(self):93        prev_index = QSettings().value(self.name, 0)94        if isinstance(prev_index, int):95            self.setCurrentIndex(prev_index)96class PersistentCheckableAction(QAction):97    def __init__(self, name, *args, **kwargs):98        super().__init__(*args, **kwargs)99        self.name = name100        self.setCheckable(True)101        self.restore_state()102        self.triggered.connect(lambda: QSettings().setValue(self.name, self.isChecked()))103    def restore_state(self):104        prev_state = QSettings().value(self.name, 0)105        if prev_state == 'true':106            self.setChecked(True)107        elif prev_state == 'false':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
