How to use restore_payload method in localstack

Best Python code snippet using localstack_python

test_restore.py

Source:test_restore.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2from abc import abstractmethod3import base644from uuid import uuid45import time6import datetime7from mobile_endpoint.backends.manager import get_dao8from mobile_endpoint.case import const9from mobile_endpoint.case.xml import V210from mobile_endpoint.restore import xml11from tests.dummy import dummy_user, dummy_restore_xml12from tests.mock import CaseFactory, CaseStructure, CaseRelationship13from tests.utils import check_xml_line_by_line14DOMAIN = 'test_domain'15class RestoreTestMixin(object):16 user_id = str(uuid4())17 case_id = str(uuid4())18 @abstractmethod19 def _get_backend(self):20 pass21 @abstractmethod22 def _get_all_synclogs(self):23 pass24 @abstractmethod25 def _get_one_synclog(self):26 pass27 @abstractmethod28 def _get_synclog_by_previous_id(self, id):29 pass30 @abstractmethod31 def _get_restore_url_snippet(self):32 pass33 def test_user_restore(self, testapp, client):34 assert 0 == len(self._get_all_synclogs())35 user = dummy_user(self.user_id)36 restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet())37 synclogs = self._get_all_synclogs()38 assert len(synclogs) == 139 synclog = synclogs[0]40 check_xml_line_by_line(41 dummy_restore_xml(user, synclog.id, items=3),42 restore_payload,43 )44 def test_user_restore_with_case(self, testapp, client):45 with testapp.app_context():46 factory = CaseFactory(47 self._get_backend(),48 client,49 domain=DOMAIN,50 case_defaults={51 'user_id': self.user_id,52 'owner_id': self.user_id,53 'case_type': 'duck',54 }55 )56 new_case, = factory.create_or_update_case(57 CaseStructure(self.case_id, attrs={58 'create': True,59 'case_name': 'Fish',60 'update': {'last_name': 'Mooney'}})61 )62 user = dummy_user(self.user_id)63 restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet())64 synclog = self._get_one_synclog()65 case_xml = xml.get_case_xml(66 new_case, [67 const.CASE_ACTION_CREATE,68 const.CASE_ACTION_UPDATE],69 version=V2)70 check_xml_line_by_line(71 dummy_restore_xml(user, synclog.id, case_xml=case_xml, items=4),72 restore_payload,73 )74 def test_get_last_modified_dates(self, testapp, client):75 """76 I was having a little trouble manufacturing a scenario that would call77 this Dao.get_last_modified_dates() with meaningful parameters, so I'm78 testing it directly.79 """80 user_id = str(uuid4())81 owner_id = str(uuid4())82 with testapp.app_context():83 factory = CaseFactory(self._get_backend(), client, domain=DOMAIN, case_defaults={84 'user_id': user_id,85 'owner_id': owner_id,86 'case_type': 'duck',87 })88 child, parent = factory.create_or_update_case(89 CaseStructure(90 attrs={'create': True, 'case_type': 'duckling'},91 relationships=[92 CaseRelationship(93 CaseStructure(attrs={'case_type': 'duck'})94 ),95 ])96 )97 ids = [child.id, parent.id]98 dao = get_dao(self._get_backend())99 id_date_map = dao.get_last_modified_dates(DOMAIN, ids)100 assert set(id_date_map.keys()) == set(ids)101 def test_sync_token(self, testapp, client):102 """103 Tests sync token / sync mode support104 """105 self.test_user_restore_with_case(testapp, client)106 user = dummy_user(self.user_id)107 synclog = self._get_one_synclog()108 synclog_id = synclog.id109 restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet(), since=synclog_id)110 new_synclog = self._get_synclog_by_previous_id(synclog_id)111 new_synclog_id = new_synclog.id112 # should no longer have a case block in the restore XML113 check_xml_line_by_line(114 dummy_restore_xml(user, new_synclog_id, items=3),115 restore_payload,116 )117 time.sleep(1) # current jsonobject doesn't support microseconds in datetime fields118 # update the case119 with testapp.app_context():120 factory = CaseFactory(121 self._get_backend(),122 client,123 domain=DOMAIN,124 case_defaults={125 'user_id': str(uuid4()),126 'owner_id': self.user_id,127 'case_type': 'duck',128 }129 )130 updated_case, = factory.create_or_update_case(131 CaseStructure(self.case_id, attrs={'update': {'occupation': 'restaurant owner'}}),132 )133 new_restore_payload = generate_restore_response(client, DOMAIN, user, self._get_restore_url_snippet(), since=new_synclog_id)134 new_new_synclog = self._get_synclog_by_previous_id(new_synclog_id)135 case_xml = xml.get_case_xml(updated_case, [const.CASE_ACTION_UPDATE], version=V2)136 # case block should come back137 check_xml_line_by_line(138 dummy_restore_xml(user, new_new_synclog.id, case_xml=case_xml, items=4),139 new_restore_payload,140 )141def generate_restore_response(client, domain, user, url_snippet, since=None):142 headers = {'Authorization': 'Basic ' + base64.b64encode('{}:{}'.format(user.username, user.password))}143 result = client.get(144 'ota/{url_snippet}/{domain}?version=2.0&items=true&user_id={user_id}{since}'.format(145 # TODO: I'm sure there is a better way to get different urls for the different backends...146 url_snippet=url_snippet,147 domain=domain,148 user_id=user.user_id,149 since='&since={}'.format(since) if since else ''),150 headers=headers151 )152 assert result.status_code == 200...

Full Screen

Full Screen

test_receiver.py

Source:test_receiver.py Github

copy

Full Screen

...33 )34 [case_block] = factory.create_or_update_cases([35 CaseStructure(attrs=case_attrs),36 ])37 restore_payload = get_restore_payload(config.restore_url, config.domain, dummy_user(user_id))38 synclog_id = synclog_id_from_restore_payload(restore_payload)39 case_xml = case_block.as_string()40 check_xml_line_by_line(41 dummy_restore_xml(dummy_user(user_id), synclog_id, case_xml=case_xml, items=4),42 restore_payload,43 )44 def test_basic_workflow(self, config):45 """46 Sync, create a case47 Verify sync doesn't contain case48 Update case by another user49 Verify sync contains updated case50 """51 user_id = str(uuid4())52 case_id = str(uuid4())53 user = dummy_user(user_id)54 initial_payload = get_restore_payload(config.restore_url, config.domain, user)55 synclog_id = synclog_id_from_restore_payload(56 initial_payload57 )58 # payload should not contain any cases59 check_xml_line_by_line(60 dummy_restore_xml(user, synclog_id, items=3),61 initial_payload,62 )63 factory = CaseFactory(64 config.receiver_url,65 domain=config.domain,66 form_extras={67 'user_id': user_id,68 }69 )70 case_attrs = {71 'create': True,72 'user_id': user_id,73 'owner_id': user_id,74 'case_type': 'gangster',75 'case_name': 'Fish',76 'update': {'last_name': 'Mooney'}77 }78 factory.create_or_update_case(79 CaseStructure(case_id, attrs=case_attrs),80 form_extras={'headers': {'last_sync_token': synclog_id}}81 )82 restore_payload = get_restore_payload(config.restore_url, config.domain, user, since=synclog_id)83 new_synclog_id = synclog_id_from_restore_payload(restore_payload)84 # restore still does not contain case85 check_xml_line_by_line(86 dummy_restore_xml(user, new_synclog_id, items=3),87 restore_payload,88 )89 # update the case90 case_updates = {'cover_job': 'restaurant owner'}91 date_modified = datetime.utcnow()92 factory.create_or_update_case(93 CaseStructure(case_id, attrs={'update': case_updates, 'date_modified': date_modified}),94 form_extras={95 'user_id': user_id,96 # 'headers': {97 # 'last_sync_token': new_synclog_id98 }#}99 )100 restore_payload = get_restore_payload(config.restore_url, config.domain, user, since=new_synclog_id)101 new_new_synclog_id = synclog_id_from_restore_payload(restore_payload)102 case_attrs['create'] = False103 case_attrs['update'].update(case_updates)104 case_block = CaseBlock(case_id, date_modified=date_modified, **case_attrs)105 # restore contain case106 check_xml_line_by_line(107 dummy_restore_xml(user, new_new_synclog_id, case_xml=case_block.as_string(), items=4),108 restore_payload,...

Full Screen

Full Screen

utils.py

Source:utils.py Github

copy

Full Screen

1from xml.etree import ElementTree2from casexml.apps.phone.models import SyncLog3from casexml.apps.phone.xml import SYNC_XMLNS4def synclog_id_from_restore_payload(restore_payload):5 element = ElementTree.fromstring(restore_payload)6 return element.findall('{%s}Sync' % SYNC_XMLNS)[0].findall('{%s}restore_id' % SYNC_XMLNS)[0].text7def synclog_from_restore_payload(restore_payload):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful