How to use call_endpoint method in localstack

Best Python code snippet using localstack_python

__init__.py

Source:__init__.py Github

copy

Full Screen

...535 return json_result536 end_error_retry(False)537 return {'msg': "Unexpected error from the server", "result": "http-error",538 "status_code": res.status_code}539 def call_endpoint(self, url=None, method="POST", request=None,540 longpolling=False, files=None, timeout=None):541 # type: (Optional[str], str, Optional[Dict[str, Any]], bool, Optional[List[IO[Any]]], Optional[float]) -> Dict[str, Any]542 if request is None:543 request = dict()544 marshalled_request = {}545 for (k, v) in request.items():546 if v is not None:547 marshalled_request[k] = v548 versioned_url = API_VERSTRING + (url if url is not None else "")549 return self.do_api_query(marshalled_request, versioned_url, method=method,550 longpolling=longpolling, files=files, timeout=timeout)551 def call_on_each_event(self, callback, event_types=None, narrow=None):552 # type: (Callable[[Dict[str, Any]], None], Optional[List[str]], Optional[List[List[str]]]) -> None553 if narrow is None:554 narrow = []555 def do_register():556 # type: () -> Tuple[str, int]557 while True:558 if event_types is None:559 res = self.register()560 else:561 res = self.register(event_types=event_types, narrow=narrow)562 if 'error' in res['result']:563 if self.verbose:564 print("Server returned error:\n%s" % res['msg'])565 time.sleep(1)566 else:567 return (res['queue_id'], res['last_event_id'])568 queue_id = None569 # Make long-polling requests with `get_events`. Once a request570 # has received an answer, pass it to the callback and before571 # making a new long-polling request.572 while True:573 if queue_id is None:574 (queue_id, last_event_id) = do_register()575 res = self.get_events(queue_id=queue_id, last_event_id=last_event_id)576 if 'error' in res['result']:577 if res["result"] == "http-error":578 if self.verbose:579 print("HTTP error fetching events -- probably a server restart")580 elif res["result"] == "connection-error":581 if self.verbose:582 print("Connection error fetching events -- probably server is temporarily down?")583 else:584 if self.verbose:585 print("Server returned error:\n%s" % res["msg"])586 # Eventually, we'll only want the587 # BAD_EVENT_QUEUE_ID check, but we check for the588 # old string to support legacy Zulip servers. We589 # should remove that legacy check in 2019.590 if res.get("code") == "BAD_EVENT_QUEUE_ID" or res["msg"].startswith("Bad event queue id:"):591 # Our event queue went away, probably because592 # we were asleep or the server restarted593 # abnormally. We may have missed some594 # events while the network was down or595 # something, but there's not really anything596 # we can do about it other than resuming597 # getting new ones.598 #599 # Reset queue_id to register a new event queue.600 queue_id = None601 # Add a pause here to cover against potential bugs in this library602 # causing a DoS attack against a server when getting errors.603 # TODO: Make this back off exponentially.604 time.sleep(1)605 continue606 for event in res['events']:607 last_event_id = max(last_event_id, int(event['id']))608 callback(event)609 def call_on_each_message(self, callback):610 # type: (Callable[[Dict[str, Any]], None]) -> None611 def event_callback(event):612 # type: (Dict[str, Any]) -> None613 if event['type'] == 'message':614 callback(event['message'])615 self.call_on_each_event(event_callback, ['message'])616 def get_messages(self, message_filters):617 # type: (Dict[str, Any]) -> Dict[str, Any]618 '''619 See examples/get-messages for example usage620 '''621 return self.call_endpoint(622 url='messages',623 method='GET',624 request=message_filters625 )626 def get_raw_message(self, message_id):627 # type: (int) -> Dict[str, str]628 '''629 See examples/get-raw-message for example usage630 '''631 return self.call_endpoint(632 url='messages/{}'.format(message_id),633 method='GET'634 )635 def send_message(self, message_data):636 # type: (Dict[str, Any]) -> Dict[str, Any]637 '''638 See examples/send-message for example usage.639 '''640 return self.call_endpoint(641 url='messages',642 request=message_data,643 )644 def upload_file(self, file):645 # type: (IO[Any]) -> Dict[str, Any]646 '''647 See examples/upload-file for example usage.648 '''649 return self.call_endpoint(650 url='user_uploads',651 files=[file]652 )653 def update_message(self, message_data):654 # type: (Dict[str, Any]) -> Dict[str, Any]655 '''656 See examples/edit-message for example usage.657 '''658 return self.call_endpoint(659 url='messages/%d' % (message_data['message_id'],),660 method='PATCH',661 request=message_data,662 )663 def delete_message(self, message_id):664 # type: (int) -> Dict[str, Any]665 '''666 See examples/delete-message for example usage.667 '''668 return self.call_endpoint(669 url='messages/{}'.format(message_id),670 method='DELETE'671 )672 def update_message_flags(self, update_data):673 # type: (Dict[str, Any]) -> Dict[str, Any]674 '''675 See examples/update-flags for example usage.676 '''677 return self.call_endpoint(678 url='messages/flags',679 method='POST',680 request=update_data681 )682 def mark_all_as_read(self):683 # type: () -> Dict[str, Any]684 '''685 Example usage:686 >>> client.mark_all_as_read()687 {'result': 'success', 'msg': ''}688 '''689 return self.call_endpoint(690 url='mark_all_as_read',691 method='POST',692 )693 def mark_stream_as_read(self, stream_id):694 # type: (int) -> Dict[str, Any]695 '''696 Example usage:697 >>> client.mark_stream_as_read(42)698 {'result': 'success', 'msg': ''}699 '''700 return self.call_endpoint(701 url='mark_stream_as_read',702 method='POST',703 request={'stream_id': stream_id},704 )705 def mark_topic_as_read(self, stream_id, topic_name):706 # type: (int, str) -> Dict[str, Any]707 '''708 Example usage:709 >>> client.mark_all_as_read(42, 'new coffee machine')710 {'result': 'success', 'msg': ''}711 '''712 return self.call_endpoint(713 url='mark_topic_as_read',714 method='POST',715 request={716 'stream_id': stream_id,717 'topic_name': topic_name,718 },719 )720 def get_message_history(self, message_id):721 # type: (int) -> Dict[str, Any]722 '''723 See examples/message-history for example usage.724 '''725 return self.call_endpoint(726 url='messages/{}/history'.format(message_id),727 method='GET'728 )729 def add_reaction(self, reaction_data):730 # type: (Dict[str, str]) -> Dict[str, Any]731 '''732 Example usage:733 >>> client.add_emoji_reaction({734 'message_id': '100',735 'emoji_name': 'joy',736 'emoji_code': '1f602',737 'emoji_type': 'unicode_emoji'738 })739 {'result': 'success', 'msg': ''}740 '''741 return self.call_endpoint(742 url='messages/{}/reactions'.format(reaction_data['message_id']),743 method='POST',744 )745 def remove_reaction(self, reaction_data):746 # type: (Dict[str, str]) -> Dict[str, Any]747 '''748 Example usage:749 >>> client.remove_reaction({750 'message_id': '100',751 'emoji_name': 'joy',752 'emoji_code': '1f602',753 'emoji_type': 'unicode_emoji'754 })755 {'msg': '', 'result': 'success'}756 '''757 return self.call_endpoint(758 url='messages/{}/reactions'.format(reaction_data['message_id']),759 method='DELETE',760 request=reaction_data,761 )762 def get_realm_emoji(self):763 # type: () -> Dict[str, Any]764 '''765 See examples/realm-emoji for example usage.766 '''767 return self.call_endpoint(768 url='realm/emoji',769 method='GET'770 )771 def upload_custom_emoji(self, emoji_name, file_obj):772 # type: (str, IO[Any]) -> Dict[str, Any]773 '''774 Example usage:775 >>> client.upload_custom_emoji(emoji_name, file_obj)776 {'result': 'success', 'msg': ''}777 '''778 return self.call_endpoint(779 'realm/emoji/{}'.format(emoji_name),780 method='POST',781 files=[file_obj]782 )783 def get_realm_filters(self):784 # type: () -> Dict[str, Any]785 '''786 Example usage:787 >>> client.get_realm_filters()788 {'result': 'success', 'msg': '', 'filters': [['#(?P<id>[0-9]+)', 'https://github.com/zulip/zulip/issues/%(id)s', 1]]}789 '''790 return self.call_endpoint(791 url='realm/filters',792 method='GET',793 )794 def add_realm_filter(self, pattern, url_format_string):795 # type: (str, str) -> Dict[str, Any]796 '''797 Example usage:798 >>> client.add_realm_filter('#(?P<id>[0-9]+)', 'https://github.com/zulip/zulip/issues/%(id)s')799 {'result': 'success', 'msg': '', 'id': 42}800 '''801 return self.call_endpoint(802 url='realm/filters',803 method='POST',804 request={805 'pattern': pattern,806 'url_format_string': url_format_string,807 },808 )809 def remove_realm_filter(self, filter_id):810 # type: (int) -> Dict[str, Any]811 '''812 Example usage:813 >>> client.remove_realm_filter(42)814 {'result': 'success', 'msg': ''}815 '''816 return self.call_endpoint(817 url='realm/filters/{}'.format(filter_id),818 method='DELETE',819 )820 def get_server_settings(self):821 # type: () -> Dict[str, Any]822 '''823 Example usage:824 >>> client.get_server_settings()825 {'msg': '', 'result': 'success', 'zulip_version': '1.9.0', 'push_notifications_enabled': False, ...}826 '''827 return self.call_endpoint(828 url='server_settings',829 method='GET',830 )831 def get_events(self, **request):832 # type: (**Any) -> Dict[str, Any]833 '''834 See the register() method for example usage.835 '''836 return self.call_endpoint(837 url='events',838 method='GET',839 longpolling=True,840 request=request,841 )842 def register(self, event_types=None, narrow=None, **kwargs):843 # type: (Optional[Iterable[str]], Optional[List[List[str]]], **Any) -> Dict[str, Any]844 '''845 Example usage:846 >>> client.register(['message'])847 {u'msg': u'', u'max_message_id': 112, u'last_event_id': -1, u'result': u'success', u'queue_id': u'1482093786:2'}848 >>> client.get_events(queue_id='1482093786:2', last_event_id=0)849 {...}850 '''851 if narrow is None:852 narrow = []853 request = dict(854 event_types=event_types,855 narrow=narrow,856 **kwargs857 )858 return self.call_endpoint(859 url='register',860 request=request,861 )862 def deregister(self, queue_id, timeout=None):863 # type: (str, Optional[float]) -> Dict[str, Any]864 '''865 Example usage:866 >>> client.register(['message'])867 {u'msg': u'', u'max_message_id': 113, u'last_event_id': -1, u'result': u'success', u'queue_id': u'1482093786:3'}868 >>> client.deregister('1482093786:3')869 {u'msg': u'', u'result': u'success'}870 '''871 request = dict(queue_id=queue_id)872 return self.call_endpoint(873 url="events",874 method="DELETE",875 request=request,876 timeout=timeout,877 )878 def get_profile(self, request=None):879 # type: (Optional[Dict[str, Any]]) -> Dict[str, Any]880 '''881 Example usage:882 >>> client.get_profile()883 {u'user_id': 5, u'full_name': u'Iago', u'short_name': u'iago', ...}884 '''885 return self.call_endpoint(886 url='users/me',887 method='GET',888 request=request,889 )890 def get_user_presence(self, email):891 # type: (Dict[str, Any]) -> Dict[str, Any]892 '''893 Example usage:894 >>> client.get_user_presence('iago@zulip.com')895 {'presence': {'website': {'timestamp': 1486799122, 'status': 'active'}}, 'result': 'success', 'msg': ''}896 '''897 return self.call_endpoint(898 url='users/%s/presence' % (email,),899 method='GET',900 )901 def update_presence(self, request):902 # type: (Dict[str, Any]) -> Dict[str, Any]903 '''904 Example usage:905 >>> client.update_presence({906 status='active',907 ping_only=False,908 new_user_input=False,909 })910 {'result': 'success', 'server_timestamp': 1333649180.7073195, 'presences': {'iago@zulip.com': { ... }}, 'msg': ''}911 '''912 return self.call_endpoint(913 url='users/me/presence',914 method='POST',915 request=request,916 )917 def get_streams(self, **request):918 # type: (**Any) -> Dict[str, Any]919 '''920 See examples/get-public-streams for example usage.921 '''922 return self.call_endpoint(923 url='streams',924 method='GET',925 request=request,926 )927 def update_stream(self, stream_data):928 # type: (Dict[str, Any]) -> Dict[str, Any]929 '''930 See examples/edit-stream for example usage.931 '''932 return self.call_endpoint(933 url='streams/{}'.format(stream_data['stream_id']),934 method='PATCH',935 request=stream_data,936 )937 def delete_stream(self, stream_id):938 # type: (int) -> Dict[str, Any]939 '''940 See examples/delete-stream for example usage.941 '''942 return self.call_endpoint(943 url='streams/{}'.format(stream_id),944 method='DELETE',945 )946 def get_members(self, request=None):947 # type: (Optional[Dict[str, Any]]) -> Dict[str, Any]948 '''949 See examples/list-members for example usage.950 '''951 return self.call_endpoint(952 url='users',953 method='GET',954 request=request,955 )956 def get_alert_words(self):957 # type: () -> Dict[str, Any]958 '''959 See examples/alert-words for example usage.960 '''961 return self.call_endpoint(962 url='users/me/alert_words',963 method='GET'964 )965 def add_alert_words(self, alert_words):966 # type: (List[str]) -> Dict[str, Any]967 '''968 See examples/alert-words for example usage.969 '''970 return self.call_endpoint(971 url='users/me/alert_words',972 method='POST',973 request={974 'alert_words': alert_words975 }976 )977 def remove_alert_words(self, alert_words):978 # type: (List[str]) -> Dict[str, Any]979 '''980 See examples/alert-words for example usage.981 '''982 return self.call_endpoint(983 url='users/me/alert_words',984 method='DELETE',985 request={986 'alert_words': alert_words987 }988 )989 def list_subscriptions(self, request=None):990 # type: (Optional[Dict[str, Any]]) -> Dict[str, Any]991 '''992 See examples/list-subscriptions for example usage.993 '''994 return self.call_endpoint(995 url='users/me/subscriptions',996 method='GET',997 request=request,998 )999 def add_subscriptions(self, streams, **kwargs):1000 # type: (Iterable[Dict[str, Any]], **Any) -> Dict[str, Any]1001 '''1002 See examples/subscribe for example usage.1003 '''1004 request = dict(1005 subscriptions=streams,1006 **kwargs1007 )1008 return self.call_endpoint(1009 url='users/me/subscriptions',1010 request=request,1011 )1012 def remove_subscriptions(self, streams, principals=None):1013 # type: (Iterable[str], Optional[Iterable[str]]) -> Dict[str, Any]1014 '''1015 See examples/unsubscribe for example usage.1016 '''1017 if principals is None:1018 principals = []1019 request = dict(1020 subscriptions=streams,1021 principals=principals1022 )1023 return self.call_endpoint(1024 url='users/me/subscriptions',1025 method='DELETE',1026 request=request,1027 )1028 def mute_topic(self, request):1029 # type: (Dict[str, Any]) -> Dict[str, Any]1030 '''1031 See examples/mute-topic for example usage.1032 '''1033 return self.call_endpoint(1034 url='users/me/subscriptions/muted_topics',1035 method='PATCH',1036 request=request1037 )1038 def update_subscription_settings(self, subscription_data):1039 # type: (List[Dict[str, Any]]) -> Dict[str, Any]1040 '''1041 Example usage:1042 >>> client.update_subscription_settings([{1043 'stream_id': 1,1044 'property': 'pin_to_top',1045 'value': True1046 },1047 {1048 'stream_id': 3,1049 'property': 'color',1050 'value': 'f00'1051 }])1052 {'result': 'success', 'msg': '', 'subscription_data': [{...}, {...}]}1053 '''1054 return self.call_endpoint(1055 url='users/me/subscriptions/properties',1056 method='POST',1057 request={'subscription_data': subscription_data}1058 )1059 def update_notification_settings(self, notification_settings):1060 # type: (Dict[str, Any]) -> Dict[str, Any]1061 '''1062 Example usage:1063 >>> client.update_notification_settings({1064 'enable_stream_push_notifications': True,1065 'enable_offline_push_notifications': False,1066 })1067 {'enable_offline_push_notifications': False, 'enable_stream_push_notifications': True, 'msg': '', 'result': 'success'}1068 '''1069 return self.call_endpoint(1070 url='settings/notifications',1071 method='PATCH',1072 request=notification_settings,1073 )1074 def get_stream_id(self, stream):1075 # type: (str) -> Dict[str, Any]1076 '''1077 Example usage: client.get_stream_id('devel')1078 '''1079 stream_encoded = urllib.parse.quote(stream, safe='')1080 url = 'get_stream_id?stream=%s' % (stream_encoded,)1081 return self.call_endpoint(1082 url=url,1083 method='GET',1084 request=None,1085 )1086 def get_stream_topics(self, stream_id):1087 # type: (int) -> Dict[str, Any]1088 '''1089 See examples/get-stream-topics for example usage.1090 '''1091 return self.call_endpoint(1092 url='users/me/{}/topics'.format(stream_id),1093 method='GET'1094 )1095 def get_user_groups(self):1096 # type: () -> Dict[str, Any]1097 '''1098 Example usage:1099 >>> client.get_user_groups()1100 {'result': 'success', 'msg': '', 'user_groups': [{...}, {...}]}1101 '''1102 return self.call_endpoint(1103 url='user_groups',1104 method='GET',1105 )1106 def create_user_group(self, group_data):1107 # type: (Dict[str, Any]) -> Dict[str, Any]1108 '''1109 Example usage:1110 >>> client.create_user_group({1111 'name': 'marketing',1112 'description': "Members of ACME Corp.'s marketing team.",1113 'members': [4, 8, 15, 16, 23, 42],1114 })1115 {'msg': '', 'result': 'success'}1116 '''1117 return self.call_endpoint(1118 url='user_groups/create',1119 method='POST',1120 request=group_data,1121 )1122 def update_user_group(self, group_data):1123 # type: (Dict[str, Any]) -> Dict[str, Any]1124 '''1125 Example usage:1126 >>> client.update_user_group({1127 'group_id': 1,1128 'name': 'marketing',1129 'description': "Members of ACME Corp.'s marketing team.",1130 })1131 {'description': 'Description successfully updated.', 'name': 'Name successfully updated.', 'result': 'success', 'msg': ''}1132 '''1133 return self.call_endpoint(1134 url='user_groups/{}'.format(group_data['group_id']),1135 method='PATCH',1136 request=group_data,1137 )1138 def remove_user_group(self, group_id):1139 # type: (int) -> Dict[str, Any]1140 '''1141 Example usage:1142 >>> client.remove_user_group(42)1143 {'msg': '', 'result': 'success'}1144 '''1145 return self.call_endpoint(1146 url='user_groups/{}'.format(group_id),1147 method='DELETE',1148 )1149 def update_user_group_members(self, group_data):1150 # type: (Dict[str, Any]) -> Dict[str, Any]1151 '''1152 Example usage:1153 >>> client.update_user_group_members({1154 'delete': [4, 8, 15],1155 'add': [16, 23, 42],1156 })1157 {'msg': '', 'result': 'success'}1158 '''1159 return self.call_endpoint(1160 url='user_groups/{}/members'.format(group_data['group_id']),1161 method='POST',1162 request=group_data,1163 )1164 def get_subscribers(self, **request):1165 # type: (**Any) -> Dict[str, Any]1166 '''1167 Example usage: client.get_subscribers(stream='devel')1168 '''1169 response = self.get_stream_id(request['stream'])1170 if response['result'] == 'error':1171 return response1172 stream_id = response['stream_id']1173 url = 'streams/%d/members' % (stream_id,)1174 return self.call_endpoint(1175 url=url,1176 method='GET',1177 request=request,1178 )1179 def render_message(self, request=None):1180 # type: (Optional[Dict[str, Any]]) -> Dict[str, Any]1181 '''1182 Example usage:1183 >>> client.render_message(request=dict(content='foo **bar**'))1184 {u'msg': u'', u'rendered': u'<p>foo <strong>bar</strong></p>', u'result': u'success'}1185 '''1186 return self.call_endpoint(1187 url='messages/render',1188 method='POST',1189 request=request,1190 )1191 def create_user(self, request=None):1192 # type: (Optional[Dict[str, Any]]) -> Dict[str, Any]1193 '''1194 See examples/create-user for example usage.1195 '''1196 return self.call_endpoint(1197 method='POST',1198 url='users',1199 request=request,1200 )1201 def update_storage(self, request):1202 # type: (Dict[str, Any]) -> Dict[str, Any]1203 '''1204 Example usage:1205 >>> client.update_storage({'storage': {"entry 1": "value 1", "entry 2": "value 2", "entry 3": "value 3"}})1206 >>> client.get_storage({'keys': ["entry 1", "entry 3"]})1207 {'result': 'success', 'storage': {'entry 1': 'value 1', 'entry 3': 'value 3'}, 'msg': ''}1208 '''1209 return self.call_endpoint(1210 url='bot_storage',1211 method='PUT',1212 request=request,1213 )1214 def get_storage(self, request=None):1215 # type: (Optional[Dict[str, Any]]) -> Dict[str, Any]1216 '''1217 Example usage:1218 >>> client.update_storage({'storage': {"entry 1": "value 1", "entry 2": "value 2", "entry 3": "value 3"}})1219 >>> client.get_storage()1220 {'result': 'success', 'storage': {"entry 1": "value 1", "entry 2": "value 2", "entry 3": "value 3"}, 'msg': ''}1221 >>> client.get_storage({'keys': ["entry 1", "entry 3"]})1222 {'result': 'success', 'storage': {'entry 1': 'value 1', 'entry 3': 'value 3'}, 'msg': ''}1223 '''1224 return self.call_endpoint(1225 url='bot_storage',1226 method='GET',1227 request=request,1228 )1229 def set_typing_status(self, request):1230 # type: (Dict[str, Any]) -> Dict[str, Any]1231 '''1232 Example usage:1233 >>> client.set_typing_status({1234 'op': 'start',1235 'to': ['iago@zulip.com', 'polonius@zulip.com'],1236 })1237 {'result': 'success', 'msg': ''}1238 '''1239 return self.call_endpoint(1240 url='typing',1241 method='POST',1242 request=request1243 )1244class ZulipStream(object):1245 """1246 A Zulip stream-like object1247 """1248 def __init__(self, type, to, subject, **kwargs):1249 # type: (str, str, str, **Any) -> None1250 self.client = Client(**kwargs)1251 self.type = type1252 self.to = to1253 self.subject = subject...

Full Screen

Full Screen

test_relay_projectconfigs.py

Source:test_relay_projectconfigs.py Github

copy

Full Screen

...36@pytest.fixture(autouse=True)37def setup_relay(default_project):38 default_project.update_option("sentry:scrub_ip_address", True)39@pytest.fixture40def call_endpoint(client, relay, private_key, default_project):41 def inner(full_config, projects=None):42 path = reverse("sentry-api-0-relay-projectconfigs")43 if projects is None:44 projects = [six.text_type(default_project.id)]45 if full_config is None:46 raw_json, signature = private_key.pack({"projects": projects})47 else:48 raw_json, signature = private_key.pack(49 {"projects": projects, "fullConfig": full_config}50 )51 resp = client.post(52 path,53 data=raw_json,54 content_type="application/json",55 HTTP_X_SENTRY_RELAY_ID=relay.relay_id,56 HTTP_X_SENTRY_RELAY_SIGNATURE=signature,57 )58 return json.loads(resp.content), resp.status_code59 return inner60@pytest.fixture61def add_org_key(default_organization, relay):62 default_organization.update_option("sentry:trusted-relays", [relay.public_key])63@pytest.mark.django_db64def test_internal_relays_should_receive_minimal_configs_if_they_do_not_explicitly_ask_for_full_config(65 call_endpoint, default_project66):67 result, status_code = call_endpoint(full_config=False)68 assert status_code < 40069 # Sweeping assertion that we do not have any snake_case in that config.70 # Might need refining.71 assert not set(x for x in _get_all_keys(result) if "-" in x or "_" in x)72 cfg = safe.get_path(result, "configs", six.text_type(default_project.id))73 assert safe.get_path(cfg, "config", "filterSettings") is None74 assert safe.get_path(cfg, "config", "groupingConfig") is None75@pytest.mark.django_db76def test_internal_relays_should_receive_full_configs(77 call_endpoint, default_project, default_projectkey78):79 result, status_code = call_endpoint(full_config=True)80 assert status_code < 40081 # Sweeping assertion that we do not have any snake_case in that config.82 # Might need refining.83 assert not set(x for x in _get_all_keys(result) if "-" in x or "_" in x)84 cfg = safe.get_path(result, "configs", six.text_type(default_project.id))85 assert safe.get_path(cfg, "disabled") is False86 public_key, = cfg["publicKeys"]87 assert public_key["publicKey"] == default_projectkey.public_key88 assert public_key["isEnabled"]89 assert "quotas" in public_key90 assert safe.get_path(cfg, "slug") == default_project.slug91 last_change = safe.get_path(cfg, "lastChange")92 assert _date_regex.match(last_change) is not None93 last_fetch = safe.get_path(cfg, "lastFetch")94 assert _date_regex.match(last_fetch) is not None95 assert safe.get_path(cfg, "organizationId") == default_project.organization.id96 assert safe.get_path(cfg, "projectId") == default_project.id97 assert safe.get_path(cfg, "slug") == default_project.slug98 assert safe.get_path(cfg, "rev") is not None99 assert safe.get_path(cfg, "config", "trustedRelays") == []100 assert safe.get_path(cfg, "config", "filterSettings") is not None101 assert safe.get_path(cfg, "config", "groupingConfig", "enhancements") is not None102 assert safe.get_path(cfg, "config", "groupingConfig", "id") is not None103 assert safe.get_path(cfg, "config", "piiConfig", "applications") is None104 assert safe.get_path(cfg, "config", "piiConfig", "rules") is None105 assert safe.get_path(cfg, "config", "datascrubbingSettings", "scrubData") is True106 assert safe.get_path(cfg, "config", "datascrubbingSettings", "scrubDefaults") is True107 assert safe.get_path(cfg, "config", "datascrubbingSettings", "scrubIpAddresses") is True108 assert safe.get_path(cfg, "config", "datascrubbingSettings", "sensitiveFields") == []109@pytest.mark.django_db110def test_trusted_external_relays_should_not_be_able_to_request_full_configs(111 add_org_key, relay, call_endpoint112):113 relay.is_internal = False114 relay.save()115 result, status_code = call_endpoint(full_config=True)116 assert status_code == 403117@pytest.mark.django_db118def test_when_not_sending_full_config_info_into_a_internal_relay_a_restricted_config_is_returned(119 call_endpoint, default_project120):121 result, status_code = call_endpoint(full_config=None)122 assert status_code < 400123 cfg = safe.get_path(result, "configs", six.text_type(default_project.id))124 assert safe.get_path(cfg, "config", "filterSettings") is None125 assert safe.get_path(cfg, "config", "groupingConfig") is None126@pytest.mark.django_db127def test_when_not_sending_full_config_info_into_an_external_relay_a_restricted_config_is_returned(128 call_endpoint, add_org_key, relay, default_project129):130 relay.is_internal = False131 relay.save()132 result, status_code = call_endpoint(full_config=None)133 assert status_code < 400134 cfg = safe.get_path(result, "configs", six.text_type(default_project.id))135 assert safe.get_path(cfg, "config", "filterSettings") is None136 assert safe.get_path(cfg, "config", "groupingConfig") is None137@pytest.mark.django_db138def test_trusted_external_relays_should_receive_minimal_configs(139 relay, add_org_key, call_endpoint, default_project, default_projectkey140):141 relay.is_internal = False142 relay.save()143 result, status_code = call_endpoint(full_config=False)144 assert status_code < 400145 cfg = safe.get_path(result, "configs", six.text_type(default_project.id))146 assert safe.get_path(cfg, "disabled") is False147 public_key, = cfg["publicKeys"]148 assert public_key["publicKey"] == default_projectkey.public_key149 assert public_key["isEnabled"]150 assert "quotas" not in public_key151 assert safe.get_path(cfg, "slug") == default_project.slug152 last_change = safe.get_path(cfg, "lastChange")153 assert _date_regex.match(last_change) is not None154 last_fetch = safe.get_path(cfg, "lastFetch")155 assert _date_regex.match(last_fetch) is not None156 assert safe.get_path(cfg, "projectId") == default_project.id157 assert safe.get_path(cfg, "slug") == default_project.slug158 assert safe.get_path(cfg, "rev") is not None159 assert safe.get_path(cfg, "organizationId") is None160 assert safe.get_path(cfg, "config", "trustedRelays") == [relay.public_key]161 assert safe.get_path(cfg, "config", "filterSettings") is None162 assert safe.get_path(cfg, "config", "groupingConfig") is None163 assert safe.get_path(cfg, "config", "datascrubbingSettings", "scrubData") is not None164 assert safe.get_path(cfg, "config", "datascrubbingSettings", "scrubIpAddresses") is not None165 assert safe.get_path(cfg, "config", "piiConfig", "rules") is None166 assert safe.get_path(cfg, "config", "piiConfig", "applications") is None167@pytest.mark.django_db168def test_untrusted_external_relays_should_not_receive_configs(169 relay, call_endpoint, default_project170):171 relay.is_internal = False172 relay.save()173 result, status_code = call_endpoint(full_config=False)174 assert status_code < 400175 cfg = result["configs"][six.text_type(default_project.id)]176 assert cfg["disabled"]177@pytest.fixture178def projectconfig_cache_set(monkeypatch):179 calls = []180 monkeypatch.setattr("sentry.relay.projectconfig_cache.set_many", calls.append)181 return calls182@pytest.mark.django_db183def test_relay_projectconfig_cache_minimal_config(184 call_endpoint, default_project, projectconfig_cache_set, task_runner185):186 """187 When a relay fetches a minimal config, that config should not end up in Redis.188 """189 with task_runner():190 result, status_code = call_endpoint(full_config=False)191 assert status_code < 400192 assert not projectconfig_cache_set193@pytest.mark.django_db194def test_relay_projectconfig_cache_full_config(195 call_endpoint, default_project, projectconfig_cache_set, task_runner196):197 """198 When a relay fetches a full config, that config should end up in Redis.199 """200 with task_runner():201 result, status_code = call_endpoint(full_config=True)202 assert status_code < 400203 http_cfg, = six.itervalues(result["configs"])204 call, = projectconfig_cache_set205 assert len(call) == 1206 redis_cfg = call[six.text_type(default_project.id)]207 del http_cfg["lastFetch"]208 del http_cfg["lastChange"]209 del redis_cfg["lastFetch"]210 del redis_cfg["lastChange"]211 assert redis_cfg == http_cfg212@pytest.mark.django_db213def test_relay_nonexistent_project(call_endpoint, projectconfig_cache_set, task_runner):214 wrong_id = max(p.id for p in Project.objects.all()) + 1215 with task_runner():216 result, status_code = call_endpoint(full_config=True, projects=[wrong_id])217 assert status_code < 400218 http_cfg, = six.itervalues(result["configs"])219 assert http_cfg == {"disabled": True}...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful