Best Python code snippet using localstack_python
test_restore.py
Source:test_restore.py  
1# -*- coding: utf-8 -*-2from abc import abstractmethod3import base644from uuid import uuid45import time6import datetime7from mobile_endpoint.backends.manager import get_dao8from mobile_endpoint.case import const9from mobile_endpoint.case.xml import V210from mobile_endpoint.restore import xml11from tests.dummy import dummy_user, dummy_restore_xml12from tests.mock import CaseFactory, CaseStructure, CaseRelationship13from tests.utils import check_xml_line_by_line14DOMAIN = 'test_domain'15class RestoreTestMixin(object):16    user_id = str(uuid4())17    case_id = str(uuid4())18    @abstractmethod19    def _get_backend(self):20        pass21    @abstractmethod22    def _get_all_synclogs(self):23        pass24    @abstractmethod25    def _get_one_synclog(self):26        pass27    @abstractmethod28    def _get_synclog_by_previous_id(self, id):29        pass30    @abstractmethod31    def _get_restore_url_snippet(self):32        pass33    def test_user_restore(self, testapp, client):34        assert 0 == len(self._get_all_synclogs())35        user = dummy_user(self.user_id)36        restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet())37        synclogs = self._get_all_synclogs()38        assert len(synclogs) == 139        synclog = synclogs[0]40        check_xml_line_by_line(41            dummy_restore_xml(user, synclog.id, items=3),42            restore_payload,43        )44    def test_user_restore_with_case(self, testapp, client):45        with testapp.app_context():46            factory = CaseFactory(47                self._get_backend(),48                client,49                domain=DOMAIN,50                case_defaults={51                    'user_id': self.user_id,52                    'owner_id': self.user_id,53                    'case_type': 'duck',54                }55            )56            new_case, = factory.create_or_update_case(57                CaseStructure(self.case_id, attrs={58                    'create': True,59                    'case_name': 'Fish',60                    'update': {'last_name': 'Mooney'}})61            )62        user = dummy_user(self.user_id)63        restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet())64        synclog = self._get_one_synclog()65        case_xml = xml.get_case_xml(66            new_case, [67                const.CASE_ACTION_CREATE,68                const.CASE_ACTION_UPDATE],69            version=V2)70        check_xml_line_by_line(71            dummy_restore_xml(user, synclog.id, case_xml=case_xml, items=4),72            restore_payload,73        )74    def test_get_last_modified_dates(self, testapp, client):75        """76        I was having a little trouble manufacturing a scenario that would call77        this Dao.get_last_modified_dates() with meaningful parameters, so I'm78        testing it directly.79        """80        user_id = str(uuid4())81        owner_id = str(uuid4())82        with testapp.app_context():83            factory = CaseFactory(self._get_backend(), client, domain=DOMAIN, case_defaults={84                'user_id': user_id,85                'owner_id': owner_id,86                'case_type': 'duck',87            })88            child, parent = factory.create_or_update_case(89                CaseStructure(90                    attrs={'create': True, 'case_type': 'duckling'},91                    relationships=[92                        CaseRelationship(93                            CaseStructure(attrs={'case_type': 'duck'})94                        ),95                    ])96            )97            ids = [child.id, parent.id]98            dao = get_dao(self._get_backend())99            id_date_map = dao.get_last_modified_dates(DOMAIN, ids)100            assert set(id_date_map.keys()) == set(ids)101    def test_sync_token(self, testapp, client):102        """103        Tests sync token / sync mode support104        """105        self.test_user_restore_with_case(testapp, client)106        user = dummy_user(self.user_id)107        synclog = self._get_one_synclog()108        synclog_id = synclog.id109        restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet(), since=synclog_id)110        new_synclog = self._get_synclog_by_previous_id(synclog_id)111        new_synclog_id = new_synclog.id112        # should no longer have a case block in the restore XML113        check_xml_line_by_line(114            dummy_restore_xml(user, new_synclog_id, items=3),115            restore_payload,116        )117        time.sleep(1)  # current jsonobject doesn't support microseconds in datetime fields118        # update the case119        with testapp.app_context():120            factory = CaseFactory(121                self._get_backend(),122                client,123                domain=DOMAIN,124                case_defaults={125                    'user_id': str(uuid4()),126                    'owner_id': self.user_id,127                    'case_type': 'duck',128                }129            )130            updated_case, = factory.create_or_update_case(131                CaseStructure(self.case_id, attrs={'update': {'occupation': 'restaurant owner'}}),132            )133        new_restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet(), since=new_synclog_id)134        new_new_synclog = self._get_synclog_by_previous_id(new_synclog_id)135        case_xml = xml.get_case_xml(updated_case, [const.CASE_ACTION_UPDATE], version=V2)136        # case block should come back137        check_xml_line_by_line(138            dummy_restore_xml(user, new_new_synclog.id, case_xml=case_xml, items=4),139            new_restore_payload,140        )141def generate_restore_response(client, domain, user, url_snippet, since=None):142    headers = {'Authorization': 'Basic ' + base64.b64encode('{}:{}'.format(user.username, user.password))}143    result = client.get(144        'ota/{url_snippet}/{domain}?version=2.0&items=true&user_id={user_id}{since}'.format(145            # TODO: I'm sure there is a better way to get different urls for the different backends...146            url_snippet=url_snippet,147            domain=domain,148            user_id=user.user_id,149            since='&since={}'.format(since) if since else ''),150        headers=headers151    )152    assert result.status_code == 200...test_receiver.py
Source:test_receiver.py  
...33        )34        [case_block] = factory.create_or_update_cases([35            CaseStructure(attrs=case_attrs),36        ])37        restore_payload = get_restore_payload(config.restore_url, config.domain, dummy_user(user_id))38        synclog_id = synclog_id_from_restore_payload(restore_payload)39        case_xml = case_block.as_string()40        check_xml_line_by_line(41            dummy_restore_xml(dummy_user(user_id), synclog_id, case_xml=case_xml, items=4),42            restore_payload,43        )44    def test_basic_workflow(self, config):45        """46        Sync, create a case47        Verify sync doesn't contain case48        Update case by another user49        Verify sync contains updated case50        """51        user_id = str(uuid4())52        case_id = str(uuid4())53        user = dummy_user(user_id)54        initial_payload = get_restore_payload(config.restore_url, config.domain, user)55        synclog_id = synclog_id_from_restore_payload(56            initial_payload57        )58        # payload should not contain any cases59        check_xml_line_by_line(60            dummy_restore_xml(user, synclog_id, items=3),61            initial_payload,62        )63        factory = CaseFactory(64            config.receiver_url,65            domain=config.domain,66            form_extras={67                'user_id': user_id,68            }69        )70        case_attrs = {71            'create': True,72            'user_id': user_id,73            'owner_id': user_id,74            'case_type': 'gangster',75            'case_name': 'Fish',76            'update': {'last_name': 'Mooney'}77        }78        factory.create_or_update_case(79            CaseStructure(case_id, attrs=case_attrs),80            form_extras={'headers': {'last_sync_token': synclog_id}}81        )82        restore_payload = get_restore_payload(config.restore_url, config.domain, user, since=synclog_id)83        new_synclog_id = synclog_id_from_restore_payload(restore_payload)84        # restore still does not contain case85        check_xml_line_by_line(86            dummy_restore_xml(user, new_synclog_id, items=3),87            restore_payload,88        )89        # update the case90        case_updates = {'cover_job': 'restaurant owner'}91        date_modified = datetime.utcnow()92        factory.create_or_update_case(93            CaseStructure(case_id, attrs={'update': case_updates, 'date_modified': date_modified}),94            form_extras={95                'user_id': user_id,96                # 'headers': {97                #     'last_sync_token': new_synclog_id98                }#}99        )100        restore_payload = get_restore_payload(config.restore_url, config.domain, user, since=new_synclog_id)101        new_new_synclog_id = synclog_id_from_restore_payload(restore_payload)102        case_attrs['create'] = False103        case_attrs['update'].update(case_updates)104        case_block = CaseBlock(case_id, date_modified=date_modified, **case_attrs)105        # restore contain case106        check_xml_line_by_line(107            dummy_restore_xml(user, new_new_synclog_id, case_xml=case_block.as_string(), items=4),108            restore_payload,...utils.py
Source:utils.py  
1from xml.etree import ElementTree2from casexml.apps.phone.models import SyncLog3from casexml.apps.phone.xml import SYNC_XMLNS4def synclog_id_from_restore_payload(restore_payload):5    element = ElementTree.fromstring(restore_payload)6    return element.findall('{%s}Sync' % SYNC_XMLNS)[0].findall('{%s}restore_id' % SYNC_XMLNS)[0].text7def synclog_from_restore_payload(restore_payload):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
