How to use call_moto method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...879 # request_payer: RequestPayer = None,880 # expected_bucket_owner: AccountId = None,881 # ) -> AbortMultipartUploadOutput:882 # self._transform_request_context(context)883 # return call_moto(context)884 # def complete_multipart_upload(885 # self,886 # context: RequestContext,887 # bucket: BucketName,888 # key: ObjectKey,889 # upload_id: MultipartUploadId,890 # multipart_upload: CompletedMultipartUpload = None,891 # request_payer: RequestPayer = None,892 # expected_bucket_owner: AccountId = None,893 # ) -> CompleteMultipartUploadOutput:894 # self._transform_request_context(context)895 # return call_moto(context)896 def copy_object(897 self,898 context: RequestContext,899 bucket: BucketName,900 copy_source: CopySource,901 key: ObjectKey,902 acl: ObjectCannedACL = None,903 cache_control: CacheControl = None,904 content_disposition: ContentDisposition = None,905 content_encoding: ContentEncoding = None,906 content_language: ContentLanguage = None,907 content_type: ContentType = None,908 copy_source_if_match: CopySourceIfMatch = None,909 copy_source_if_modified_since: CopySourceIfModifiedSince = None,910 copy_source_if_none_match: CopySourceIfNoneMatch = None,911 copy_source_if_unmodified_since: CopySourceIfUnmodifiedSince = None,912 expires: Expires = None,913 grant_full_control: GrantFullControl = None,914 grant_read: GrantRead = None,915 grant_read_acp: GrantReadACP = None,916 grant_write_acp: GrantWriteACP = None,917 metadata: Metadata = None,918 metadata_directive: MetadataDirective = None,919 tagging_directive: TaggingDirective = None,920 server_side_encryption: ServerSideEncryption = None,921 storage_class: StorageClass = None,922 website_redirect_location: WebsiteRedirectLocation = None,923 sse_customer_algorithm: SSECustomerAlgorithm = None,924 sse_customer_key: SSECustomerKey = None,925 sse_customer_key_md5: SSECustomerKeyMD5 = None,926 ssekms_key_id: SSEKMSKeyId = None,927 ssekms_encryption_context: SSEKMSEncryptionContext = None,928 bucket_key_enabled: BucketKeyEnabled = None,929 copy_source_sse_customer_algorithm: CopySourceSSECustomerAlgorithm = None,930 copy_source_sse_customer_key: CopySourceSSECustomerKey = None,931 copy_source_sse_customer_key_md5: CopySourceSSECustomerKeyMD5 = None,932 request_payer: RequestPayer = None,933 tagging: TaggingHeader = None,934 object_lock_mode: ObjectLockMode = None,935 object_lock_retain_until_date: ObjectLockRetainUntilDate = None,936 object_lock_legal_hold_status: ObjectLockLegalHoldStatus = None,937 expected_bucket_owner: AccountId = None,938 expected_source_bucket_owner: AccountId = None,939 ) -> CopyObjectOutput:940 self._transform_request_context(context)941 self._transform_content_types(context)942 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)943 return call_moto(context)944 def create_bucket(945 self,946 context: RequestContext,947 bucket: BucketName,948 acl: BucketCannedACL = None,949 create_bucket_configuration: CreateBucketConfiguration = None,950 grant_full_control: GrantFullControl = None,951 grant_read: GrantRead = None,952 grant_read_acp: GrantReadACP = None,953 grant_write: GrantWrite = None,954 grant_write_acp: GrantWriteACP = None,955 object_lock_enabled_for_bucket: ObjectLockEnabledForBucket = None,956 object_ownership: ObjectOwnership = None,957 ) -> CreateBucketOutput:958 self._transform_request_context(context)959 self._transform_content_types(context)960 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)961 # Moto doesn't allow to put a location constraint on 'us-east-1'.962 location_constraint = create_bucket_configuration.get("LocationConstraint", None)963 if location_constraint == AWS_REGION_US_EAST_1:964 # Drop CreateBucketConfiguration bindings, inject compulsory fields, retail all others.965 binds_bs = context.request.data966 binds = xmltodict.parse(binds_bs)967 #968 binds["Bucket"] = bucket969 del binds["CreateBucketConfiguration"]970 #971 return call_moto_with_request(context, binds)972 return call_moto(context)973 def create_multipart_upload(974 self,975 context: RequestContext,976 bucket: BucketName,977 key: ObjectKey,978 acl: ObjectCannedACL = None,979 cache_control: CacheControl = None,980 content_disposition: ContentDisposition = None,981 content_encoding: ContentEncoding = None,982 content_language: ContentLanguage = None,983 content_type: ContentType = None,984 expires: Expires = None,985 grant_full_control: GrantFullControl = None,986 grant_read: GrantRead = None,987 grant_read_acp: GrantReadACP = None,988 grant_write_acp: GrantWriteACP = None,989 metadata: Metadata = None,990 server_side_encryption: ServerSideEncryption = None,991 storage_class: StorageClass = None,992 website_redirect_location: WebsiteRedirectLocation = None,993 sse_customer_algorithm: SSECustomerAlgorithm = None,994 sse_customer_key: SSECustomerKey = None,995 sse_customer_key_md5: SSECustomerKeyMD5 = None,996 ssekms_key_id: SSEKMSKeyId = None,997 ssekms_encryption_context: SSEKMSEncryptionContext = None,998 bucket_key_enabled: BucketKeyEnabled = None,999 request_payer: RequestPayer = None,1000 tagging: TaggingHeader = None,1001 object_lock_mode: ObjectLockMode = None,1002 object_lock_retain_until_date: ObjectLockRetainUntilDate = None,1003 object_lock_legal_hold_status: ObjectLockLegalHoldStatus = None,1004 expected_bucket_owner: AccountId = None,1005 ) -> CreateMultipartUploadOutput:1006 self._transform_request_context(context)1007 return call_moto(context)1008 # def delete_bucket(1009 # self,1010 # context: RequestContext,1011 # bucket: BucketName,1012 # expected_bucket_owner: AccountId = None,1013 # ) -> None:1014 # self._transform_request_context(context)1015 # call_moto(context)1016 # def delete_bucket_analytics_configuration(1017 # self,1018 # context: RequestContext,1019 # bucket: BucketName,1020 # id: AnalyticsId,1021 # expected_bucket_owner: AccountId = None,1022 # ) -> None:1023 # self._transform_request_context(context)1024 # call_moto(context)1025 # def delete_bucket_cors(1026 # self,1027 # context: RequestContext,1028 # bucket: BucketName,1029 # expected_bucket_owner: AccountId = None,1030 # ) -> None:1031 # self._transform_request_context(context)1032 # call_moto(context)1033 # def delete_bucket_encryption(1034 # self,1035 # context: RequestContext,1036 # bucket: BucketName,1037 # expected_bucket_owner: AccountId = None,1038 # ) -> None:1039 # self._transform_request_context(context)1040 # call_moto(context)1041 # def delete_bucket_intelligent_tiering_configuration(1042 # self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId1043 # ) -> None:1044 # self._transform_request_context(context)1045 # call_moto(context)1046 # def delete_bucket_inventory_configuration(1047 # self,1048 # context: RequestContext,1049 # bucket: BucketName,1050 # id: InventoryId,1051 # expected_bucket_owner: AccountId = None,1052 # ) -> None:1053 # self._transform_request_context(context)1054 # call_moto(context)1055 # def delete_bucket_lifecycle(1056 # self,1057 # context: RequestContext,1058 # bucket: BucketName,1059 # expected_bucket_owner: AccountId = None,1060 # ) -> None:1061 # self._transform_request_context(context)1062 # call_moto(context)1063 # def delete_bucket_metrics_configuration(1064 # self,1065 # context: RequestContext,1066 # bucket: BucketName,1067 # id: MetricsId,1068 # expected_bucket_owner: AccountId = None,1069 # ) -> None:1070 # self._transform_request_context(context)1071 # call_moto(context)1072 # def delete_bucket_ownership_controls(1073 # self,1074 # context: RequestContext,1075 # bucket: BucketName,1076 # expected_bucket_owner: AccountId = None,1077 # ) -> None:1078 # self._transform_request_context(context)1079 # call_moto(context)1080 # def delete_bucket_policy(1081 # self,1082 # context: RequestContext,1083 # bucket: BucketName,1084 # expected_bucket_owner: AccountId = None,1085 # ) -> None:1086 # self._transform_request_context(context)1087 # call_moto(context)1088 # def delete_bucket_replication(1089 # self,1090 # context: RequestContext,1091 # bucket: BucketName,1092 # expected_bucket_owner: AccountId = None,1093 # ) -> None:1094 # self._transform_request_context(context)1095 # call_moto(context)1096 # def delete_bucket_tagging(1097 # self,1098 # context: RequestContext,1099 # bucket: BucketName,1100 # expected_bucket_owner: AccountId = None,1101 # ) -> None:1102 # self._transform_request_context(context)1103 # call_moto(context)1104 # def delete_bucket_website(1105 # self,1106 # context: RequestContext,1107 # bucket: BucketName,1108 # expected_bucket_owner: AccountId = None,1109 # ) -> None:1110 # self._transform_request_context(context)1111 # call_moto(context)1112 # def delete_object(1113 # self,1114 # context: RequestContext,1115 # bucket: BucketName,1116 # key: ObjectKey,1117 # mfa: MFA = None,1118 # version_id: ObjectVersionId = None,1119 # request_payer: RequestPayer = None,1120 # bypass_governance_retention: BypassGovernanceRetention = None,1121 # expected_bucket_owner: AccountId = None,1122 # ) -> DeleteObjectOutput:1123 # self._transform_request_context(context)1124 # return call_moto(context)1125 # def delete_object_tagging(1126 # self,1127 # context: RequestContext,1128 # bucket: BucketName,1129 # key: ObjectKey,1130 # version_id: ObjectVersionId = None,1131 # expected_bucket_owner: AccountId = None,1132 # ) -> DeleteObjectTaggingOutput:1133 # self._transform_request_context(context)1134 # return call_moto(context)1135 # def delete_objects(1136 # self,1137 # context: RequestContext,1138 # bucket: BucketName,1139 # delete: Delete,1140 # mfa: MFA = None,1141 # request_payer: RequestPayer = None,1142 # bypass_governance_retention: BypassGovernanceRetention = None,1143 # expected_bucket_owner: AccountId = None,1144 # ) -> DeleteObjectsOutput:1145 # self._transform_request_context(context)1146 # return call_moto(context)1147 # def delete_public_access_block(1148 # self,1149 # context: RequestContext,1150 # bucket: BucketName,1151 # expected_bucket_owner: AccountId = None,1152 # ) -> None:1153 # self._transform_request_context(context)1154 # call_moto(context)1155 def get_bucket_accelerate_configuration(1156 self,1157 context: RequestContext,1158 bucket: BucketName,1159 expected_bucket_owner: AccountId = None,1160 ) -> GetBucketAccelerateConfigurationOutput:1161 self._transform_request_context(context)1162 if self._is_static_website(context):1163 return self._serve_static_website(context, bucket)1164 return call_moto(context)1165 def get_bucket_acl(1166 self,1167 context: RequestContext,1168 bucket: BucketName,1169 expected_bucket_owner: AccountId = None,1170 ) -> GetBucketAclOutput:1171 self._transform_request_context(context)1172 if self._is_static_website(context):1173 return self._serve_static_website(context, bucket)1174 return call_moto(context)1175 def get_bucket_analytics_configuration(1176 self,1177 context: RequestContext,1178 bucket: BucketName,1179 id: AnalyticsId,1180 expected_bucket_owner: AccountId = None,1181 ) -> GetBucketAnalyticsConfigurationOutput:1182 self._transform_request_context(context)1183 if self._is_static_website(context):1184 return self._serve_static_website(context, bucket)1185 return call_moto(context)1186 def get_bucket_cors(1187 self,1188 context: RequestContext,1189 bucket: BucketName,1190 expected_bucket_owner: AccountId = None,1191 ) -> GetBucketCorsOutput:1192 self._transform_request_context(context)1193 if self._is_static_website(context):1194 return self._serve_static_website(context, bucket)1195 return call_moto(context)1196 def get_bucket_encryption(1197 self,1198 context: RequestContext,1199 bucket: BucketName,1200 expected_bucket_owner: AccountId = None,1201 ) -> GetBucketEncryptionOutput:1202 self._transform_request_context(context)1203 if self._is_static_website(context):1204 return self._serve_static_website(context, bucket)1205 return call_moto(context)1206 def get_bucket_intelligent_tiering_configuration(1207 self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId1208 ) -> GetBucketIntelligentTieringConfigurationOutput:1209 self._transform_request_context(context)1210 if self._is_static_website(context):1211 return self._serve_static_website(context, bucket)1212 return call_moto(context)1213 def get_bucket_inventory_configuration(1214 self,1215 context: RequestContext,1216 bucket: BucketName,1217 id: InventoryId,1218 expected_bucket_owner: AccountId = None,1219 ) -> GetBucketInventoryConfigurationOutput:1220 self._transform_request_context(context)1221 if self._is_static_website(context):1222 return self._serve_static_website(context, bucket)1223 return call_moto(context)1224 def get_bucket_lifecycle(1225 self,1226 context: RequestContext,1227 bucket: BucketName,1228 expected_bucket_owner: AccountId = None,1229 ) -> GetBucketLifecycleOutput:1230 self._transform_request_context(context)1231 if self._is_static_website(context):1232 return self._serve_static_website(context, bucket)1233 return call_moto(context)1234 def get_bucket_lifecycle_configuration(1235 self,1236 context: RequestContext,1237 bucket: BucketName,1238 expected_bucket_owner: AccountId = None,1239 ) -> GetBucketLifecycleConfigurationOutput:1240 self._transform_request_context(context)1241 if self._is_static_website(context):1242 return self._serve_static_website(context, bucket)1243 return call_moto(context)1244 def get_bucket_location(1245 self,1246 context: RequestContext,1247 bucket: BucketName,1248 expected_bucket_owner: AccountId = None,1249 ) -> GetBucketLocationOutput:1250 self._transform_request_context(context)1251 if self._is_static_website(context):1252 return self._serve_static_website(context, bucket)1253 return call_moto(context)1254 def get_bucket_logging(1255 self,1256 context: RequestContext,1257 bucket: BucketName,1258 expected_bucket_owner: AccountId = None,1259 ) -> GetBucketLoggingOutput:1260 self._transform_request_context(context)1261 if self._is_static_website(context):1262 return self._serve_static_website(context, bucket)1263 return call_moto(context)1264 def get_bucket_metrics_configuration(1265 self,1266 context: RequestContext,1267 bucket: BucketName,1268 id: MetricsId,1269 expected_bucket_owner: AccountId = None,1270 ) -> GetBucketMetricsConfigurationOutput:1271 self._transform_request_context(context)1272 if self._is_static_website(context):1273 return self._serve_static_website(context, bucket)1274 return call_moto(context)1275 def get_bucket_notification(1276 self,1277 context: RequestContext,1278 bucket: BucketName,1279 expected_bucket_owner: AccountId = None,1280 ) -> NotificationConfigurationDeprecated:1281 self._transform_request_context(context)1282 if self._is_static_website(context):1283 return self._serve_static_website(context, bucket)1284 return call_moto(context)1285 def get_bucket_notification_configuration(1286 self,1287 context: RequestContext,1288 bucket: BucketName,1289 expected_bucket_owner: AccountId = None,1290 ) -> NotificationConfiguration:1291 self._transform_request_context(context)1292 if self._is_static_website(context):1293 return self._serve_static_website(context, bucket)1294 return call_moto(context)1295 def get_bucket_ownership_controls(1296 self,1297 context: RequestContext,1298 bucket: BucketName,1299 expected_bucket_owner: AccountId = None,1300 ) -> GetBucketOwnershipControlsOutput:1301 self._transform_request_context(context)1302 if self._is_static_website(context):1303 return self._serve_static_website(context, bucket)1304 return call_moto(context)1305 def get_bucket_policy(1306 self,1307 context: RequestContext,1308 bucket: BucketName,1309 expected_bucket_owner: AccountId = None,1310 ) -> GetBucketPolicyOutput:1311 self._transform_request_context(context)1312 if self._is_static_website(context):1313 return self._serve_static_website(context, bucket)1314 return call_moto(context)1315 def get_bucket_policy_status(1316 self,1317 context: RequestContext,1318 bucket: BucketName,1319 expected_bucket_owner: AccountId = None,1320 ) -> GetBucketPolicyStatusOutput:1321 self._transform_request_context(context)1322 if self._is_static_website(context):1323 return self._serve_static_website(context, bucket)1324 return call_moto(context)1325 def get_bucket_replication(1326 self,1327 context: RequestContext,1328 bucket: BucketName,1329 expected_bucket_owner: AccountId = None,1330 ) -> GetBucketReplicationOutput:1331 self._transform_request_context(context)1332 if self._is_static_website(context):1333 return self._serve_static_website(context, bucket)1334 return call_moto(context)1335 def get_bucket_request_payment(1336 self,1337 context: RequestContext,1338 bucket: BucketName,1339 expected_bucket_owner: AccountId = None,1340 ) -> GetBucketRequestPaymentOutput:1341 self._transform_request_context(context)1342 if self._is_static_website(context):1343 return self._serve_static_website(context, bucket)1344 return call_moto(context)1345 def get_bucket_tagging(1346 self,1347 context: RequestContext,1348 bucket: BucketName,1349 expected_bucket_owner: AccountId = None,1350 ) -> GetBucketTaggingOutput:1351 self._transform_request_context(context)1352 if self._is_static_website(context):1353 return self._serve_static_website(context, bucket)1354 return call_moto(context)1355 def get_bucket_versioning(1356 self,1357 context: RequestContext,1358 bucket: BucketName,1359 expected_bucket_owner: AccountId = None,1360 ) -> GetBucketVersioningOutput:1361 self._transform_request_context(context)1362 if self._is_static_website(context):1363 return self._serve_static_website(context, bucket)1364 return call_moto(context)1365 def get_bucket_website(1366 self,1367 context: RequestContext,1368 bucket: BucketName,1369 expected_bucket_owner: AccountId = None,1370 ) -> GetBucketWebsiteOutput:1371 self._transform_request_context(context)1372 if self._is_static_website(context):1373 return self._serve_static_website(context, bucket)1374 return call_moto(context)1375 def get_object(1376 self,1377 context: RequestContext,1378 bucket: BucketName,1379 key: ObjectKey,1380 if_match: IfMatch = None,1381 if_modified_since: IfModifiedSince = None,1382 if_none_match: IfNoneMatch = None,1383 if_unmodified_since: IfUnmodifiedSince = None,1384 range: Range = None,1385 response_cache_control: ResponseCacheControl = None,1386 response_content_disposition: ResponseContentDisposition = None,1387 response_content_encoding: ResponseContentEncoding = None,1388 response_content_language: ResponseContentLanguage = None,1389 response_content_type: ResponseContentType = None,1390 response_expires: ResponseExpires = None,1391 version_id: ObjectVersionId = None,1392 sse_customer_algorithm: SSECustomerAlgorithm = None,1393 sse_customer_key: SSECustomerKey = None,1394 sse_customer_key_md5: SSECustomerKeyMD5 = None,1395 request_payer: RequestPayer = None,1396 part_number: PartNumber = None,1397 expected_bucket_owner: AccountId = None,1398 ) -> GetObjectOutput:1399 self._transform_request_context(context)1400 if self._is_static_website(context):1401 return self._serve_static_website(context, bucket)1402 return call_moto(context)1403 def get_object_acl(1404 self,1405 context: RequestContext,1406 bucket: BucketName,1407 key: ObjectKey,1408 version_id: ObjectVersionId = None,1409 request_payer: RequestPayer = None,1410 expected_bucket_owner: AccountId = None,1411 ) -> GetObjectAclOutput:1412 self._transform_request_context(context)1413 if self._is_static_website(context):1414 return self._serve_static_website(context, bucket)1415 return call_moto(context)1416 def get_object_legal_hold(1417 self,1418 context: RequestContext,1419 bucket: BucketName,1420 key: ObjectKey,1421 version_id: ObjectVersionId = None,1422 request_payer: RequestPayer = None,1423 expected_bucket_owner: AccountId = None,1424 ) -> GetObjectLegalHoldOutput:1425 self._transform_request_context(context)1426 if self._is_static_website(context):1427 return self._serve_static_website(context, bucket)1428 return call_moto(context)1429 def get_object_lock_configuration(1430 self,1431 context: RequestContext,1432 bucket: BucketName,1433 expected_bucket_owner: AccountId = None,1434 ) -> GetObjectLockConfigurationOutput:1435 self._transform_request_context(context)1436 if self._is_static_website(context):1437 return self._serve_static_website(context, bucket)1438 return call_moto(context)1439 def get_object_retention(1440 self,1441 context: RequestContext,1442 bucket: BucketName,1443 key: ObjectKey,1444 version_id: ObjectVersionId = None,1445 request_payer: RequestPayer = None,1446 expected_bucket_owner: AccountId = None,1447 ) -> GetObjectRetentionOutput:1448 self._transform_request_context(context)1449 if self._is_static_website(context):1450 return self._serve_static_website(context, bucket)1451 return call_moto(context)1452 def get_object_tagging(1453 self,1454 context: RequestContext,1455 bucket: BucketName,1456 key: ObjectKey,1457 version_id: ObjectVersionId = None,1458 expected_bucket_owner: AccountId = None,1459 request_payer: RequestPayer = None,1460 ) -> GetObjectTaggingOutput:1461 self._transform_request_context(context)1462 if self._is_static_website(context):1463 return self._serve_static_website(context, bucket)1464 return call_moto(context)1465 def get_object_torrent(1466 self,1467 context: RequestContext,1468 bucket: BucketName,1469 key: ObjectKey,1470 request_payer: RequestPayer = None,1471 expected_bucket_owner: AccountId = None,1472 ) -> GetObjectTorrentOutput:1473 self._transform_request_context(context)1474 if self._is_static_website(context):1475 return self._serve_static_website(context, bucket)1476 return call_moto(context)1477 def get_public_access_block(1478 self,1479 context: RequestContext,1480 bucket: BucketName,1481 expected_bucket_owner: AccountId = None,1482 ) -> GetPublicAccessBlockOutput:1483 self._transform_request_context(context)1484 if self._is_static_website(context):1485 return self._serve_static_website(context, bucket)1486 return call_moto(context)1487 # def head_bucket(1488 # self,1489 # context: RequestContext,1490 # bucket: BucketName,1491 # expected_bucket_owner: AccountId = None,1492 # ) -> None:1493 # self._transform_request_context(context)1494 # call_moto(context, include_response_metadata=True)1495 # def head_object(1496 # self,1497 # context: RequestContext,1498 # bucket: BucketName,1499 # key: ObjectKey,1500 # if_match: IfMatch = None,1501 # if_modified_since: IfModifiedSince = None,1502 # if_none_match: IfNoneMatch = None,1503 # if_unmodified_since: IfUnmodifiedSince = None,1504 # range: Range = None,1505 # version_id: ObjectVersionId = None,1506 # sse_customer_algorithm: SSECustomerAlgorithm = None,1507 # sse_customer_key: SSECustomerKey = None,1508 # sse_customer_key_md5: SSECustomerKeyMD5 = None,1509 # request_payer: RequestPayer = None,1510 # part_number: PartNumber = None,1511 # expected_bucket_owner: AccountId = None,1512 # ) -> HeadObjectOutput:1513 # self._transform_request_context(context)1514 # return call_moto(context, include_response_metadata=True)1515 # def list_bucket_analytics_configurations(1516 # self,1517 # context: RequestContext,1518 # bucket: BucketName,1519 # continuation_token: Token = None,1520 # expected_bucket_owner: AccountId = None,1521 # ) -> ListBucketAnalyticsConfigurationsOutput:1522 # self._transform_request_context(context)1523 # if self._is_static_website(context):1524 # return self._serve_static_website(context, bucket)1525 # return call_moto(context)1526 # def list_bucket_intelligent_tiering_configurations(1527 # self,1528 # context: RequestContext,1529 # bucket: BucketName,1530 # continuation_token: Token = None,1531 # ) -> ListBucketIntelligentTieringConfigurationsOutput:1532 # self._transform_request_context(context)1533 # if self._is_static_website(context):1534 # return self._serve_static_website(context, bucket)1535 # return call_moto(context)1536 # def list_bucket_inventory_configurations(1537 # self,1538 # context: RequestContext,1539 # bucket: BucketName,1540 # continuation_token: Token = None,1541 # expected_bucket_owner: AccountId = None,1542 # ) -> ListBucketInventoryConfigurationsOutput:1543 # self._transform_request_context(context)1544 # return call_moto(context)1545 # def list_bucket_metrics_configurations(1546 # self,1547 # context: RequestContext,1548 # bucket: BucketName,1549 # continuation_token: Token = None,1550 # expected_bucket_owner: AccountId = None,1551 # ) -> ListBucketMetricsConfigurationsOutput:1552 # self._transform_request_context(context)1553 # return call_moto(context)1554 # def list_buckets(1555 # self,1556 # context: RequestContext,1557 # ) -> ListBucketsOutput:1558 # self._transform_request_context(context)1559 # return call_moto(context)1560 # def list_multipart_uploads(1561 # self,1562 # context: RequestContext,1563 # bucket: BucketName,1564 # delimiter: Delimiter = None,1565 # encoding_type: EncodingType = None,1566 # key_marker: KeyMarker = None,1567 # max_uploads: MaxUploads = None,1568 # prefix: Prefix = None,1569 # upload_id_marker: UploadIdMarker = None,1570 # expected_bucket_owner: AccountId = None,1571 # ) -> ListMultipartUploadsOutput:1572 # self._transform_request_context(context)1573 # return call_moto(context)1574 # def list_object_versions(1575 # self,1576 # context: RequestContext,1577 # bucket: BucketName,1578 # delimiter: Delimiter = None,1579 # encoding_type: EncodingType = None,1580 # key_marker: KeyMarker = None,1581 # max_keys: MaxKeys = None,1582 # prefix: Prefix = None,1583 # version_id_marker: VersionIdMarker = None,1584 # expected_bucket_owner: AccountId = None,1585 # ) -> ListObjectVersionsOutput:1586 # self._transform_request_context(context)1587 # return call_moto(context)1588 # def list_objects(1589 # self,1590 # context: RequestContext,1591 # bucket: BucketName,1592 # delimiter: Delimiter = None,1593 # encoding_type: EncodingType = None,1594 # marker: Marker = None,1595 # max_keys: MaxKeys = None,1596 # prefix: Prefix = None,1597 # request_payer: RequestPayer = None,1598 # expected_bucket_owner: AccountId = None,1599 # ) -> ListObjectsOutput:1600 # self._transform_request_context(context)1601 # return call_moto(context)1602 # def list_objects_v2(1603 # self,1604 # context: RequestContext,1605 # bucket: BucketName,1606 # delimiter: Delimiter = None,1607 # encoding_type: EncodingType = None,1608 # max_keys: MaxKeys = None,1609 # prefix: Prefix = None,1610 # continuation_token: Token = None,1611 # fetch_owner: FetchOwner = None,1612 # start_after: StartAfter = None,1613 # request_payer: RequestPayer = None,1614 # expected_bucket_owner: AccountId = None,1615 # ) -> ListObjectsV2Output:1616 # self._transform_request_context(context)1617 # return call_moto(context, include_response_metadata=True)1618 # def list_parts(1619 # self,1620 # context: RequestContext,1621 # bucket: BucketName,1622 # key: ObjectKey,1623 # upload_id: MultipartUploadId,1624 # max_parts: MaxParts = None,1625 # part_number_marker: PartNumberMarker = None,1626 # request_payer: RequestPayer = None,1627 # expected_bucket_owner: AccountId = None,1628 # ) -> ListPartsOutput:1629 # self._transform_request_context(context)1630 # return call_moto(context)1631 def put_bucket_accelerate_configuration(1632 self,1633 context: RequestContext,1634 bucket: BucketName,1635 accelerate_configuration: AccelerateConfiguration,1636 expected_bucket_owner: AccountId = None,1637 ) -> None:1638 self._transform_request_context(context)1639 self._transform_content_types(context)1640 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1641 call_moto(context)1642 def put_bucket_acl(1643 self,1644 context: RequestContext,1645 bucket: BucketName,1646 acl: BucketCannedACL = None,1647 access_control_policy: AccessControlPolicy = None,1648 content_md5: ContentMD5 = None,1649 grant_full_control: GrantFullControl = None,1650 grant_read: GrantRead = None,1651 grant_read_acp: GrantReadACP = None,1652 grant_write: GrantWrite = None,1653 grant_write_acp: GrantWriteACP = None,1654 expected_bucket_owner: AccountId = None,1655 ) -> None:1656 self._transform_request_context(context)1657 self._transform_content_types(context)1658 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1659 self._raise_if_invalid_content_md5(context, content_md5)1660 call_moto(context)1661 def put_bucket_analytics_configuration(1662 self,1663 context: RequestContext,1664 bucket: BucketName,1665 id: AnalyticsId,1666 analytics_configuration: AnalyticsConfiguration,1667 expected_bucket_owner: AccountId = None,1668 ) -> None:1669 self._transform_request_context(context)1670 self._transform_content_types(context)1671 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1672 call_moto(context)1673 def put_bucket_cors(1674 self,1675 context: RequestContext,1676 bucket: BucketName,1677 cors_configuration: CORSConfiguration,1678 content_md5: ContentMD5 = None,1679 expected_bucket_owner: AccountId = None,1680 ) -> None:1681 self._transform_request_context(context)1682 self._transform_content_types(context)1683 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1684 self._raise_if_invalid_content_md5(context, content_md5)1685 call_moto(context)1686 def put_bucket_encryption(1687 self,1688 context: RequestContext,1689 bucket: BucketName,1690 server_side_encryption_configuration: ServerSideEncryptionConfiguration,1691 content_md5: ContentMD5 = None,1692 expected_bucket_owner: AccountId = None,1693 ) -> None:1694 self._transform_request_context(context)1695 self._transform_content_types(context)1696 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1697 self._raise_if_invalid_content_md5(context, content_md5)1698 call_moto(context)1699 def put_bucket_intelligent_tiering_configuration(1700 self,1701 context: RequestContext,1702 bucket: BucketName,1703 id: IntelligentTieringId,1704 intelligent_tiering_configuration: IntelligentTieringConfiguration,1705 ) -> None:1706 self._transform_request_context(context)1707 self._transform_content_types(context)1708 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1709 call_moto(context)1710 def put_bucket_inventory_configuration(1711 self,1712 context: RequestContext,1713 bucket: BucketName,1714 id: InventoryId,1715 inventory_configuration: InventoryConfiguration,1716 expected_bucket_owner: AccountId = None,1717 ) -> None:1718 self._transform_request_context(context)1719 self._transform_content_types(context)1720 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1721 call_moto(context)1722 def put_bucket_lifecycle(1723 self,1724 context: RequestContext,1725 bucket: BucketName,1726 content_md5: ContentMD5 = None,1727 lifecycle_configuration: LifecycleConfiguration = None,1728 expected_bucket_owner: AccountId = None,1729 ) -> None:1730 self._transform_request_context(context)1731 self._transform_content_types(context)1732 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1733 self._raise_if_invalid_content_md5(context, content_md5)1734 call_moto(context)1735 def put_bucket_lifecycle_configuration(1736 self,1737 context: RequestContext,1738 bucket: BucketName,1739 lifecycle_configuration: BucketLifecycleConfiguration = None,1740 expected_bucket_owner: AccountId = None,1741 ) -> None:1742 self._transform_request_context(context)1743 self._transform_content_types(context)1744 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1745 call_moto(context)1746 def put_bucket_logging(1747 self,1748 context: RequestContext,1749 bucket: BucketName,1750 bucket_logging_status: BucketLoggingStatus,1751 content_md5: ContentMD5 = None,1752 expected_bucket_owner: AccountId = None,1753 ) -> None:1754 self._transform_request_context(context)1755 self._transform_content_types(context)1756 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1757 self._raise_if_invalid_content_md5(context, content_md5)1758 call_moto(context)1759 def put_bucket_metrics_configuration(1760 self,1761 context: RequestContext,1762 bucket: BucketName,1763 id: MetricsId,1764 metrics_configuration: MetricsConfiguration,1765 expected_bucket_owner: AccountId = None,1766 ) -> None:1767 self._transform_request_context(context)1768 self._transform_content_types(context)1769 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1770 call_moto(context)1771 def put_bucket_notification(1772 self,1773 context: RequestContext,1774 bucket: BucketName,1775 notification_configuration: NotificationConfigurationDeprecated,1776 content_md5: ContentMD5 = None,1777 expected_bucket_owner: AccountId = None,1778 ) -> None:1779 self._transform_request_context(context)1780 self._transform_content_types(context)1781 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1782 self._raise_if_invalid_content_md5(context, content_md5)1783 call_moto(context)1784 def put_bucket_notification_configuration(1785 self,1786 context: RequestContext,1787 bucket: BucketName,1788 notification_configuration: NotificationConfiguration,1789 expected_bucket_owner: AccountId = None,1790 skip_destination_validation: SkipValidation = None,1791 ) -> None:1792 self._transform_request_context(context)1793 self._transform_content_types(context)1794 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1795 call_moto(context)1796 def put_bucket_ownership_controls(1797 self,1798 context: RequestContext,1799 bucket: BucketName,1800 ownership_controls: OwnershipControls,1801 content_md5: ContentMD5 = None,1802 expected_bucket_owner: AccountId = None,1803 ) -> None:1804 self._transform_request_context(context)1805 self._transform_content_types(context)1806 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1807 self._raise_if_invalid_content_md5(context, content_md5)1808 call_moto(context)1809 def put_bucket_policy(1810 self,1811 context: RequestContext,1812 bucket: BucketName,1813 policy: Policy,1814 content_md5: ContentMD5 = None,1815 confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,1816 expected_bucket_owner: AccountId = None,1817 ) -> None:1818 self._transform_request_context(context)1819 self._transform_content_types(context)1820 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1821 self._raise_if_invalid_content_md5(context, content_md5)1822 call_moto(context)1823 def put_bucket_replication(1824 self,1825 context: RequestContext,1826 bucket: BucketName,1827 replication_configuration: ReplicationConfiguration,1828 content_md5: ContentMD5 = None,1829 token: ObjectLockToken = None,1830 expected_bucket_owner: AccountId = None,1831 ) -> None:1832 self._transform_request_context(context)1833 self._transform_content_types(context)1834 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1835 self._raise_if_invalid_content_md5(context, content_md5)1836 call_moto(context)1837 def put_bucket_request_payment(1838 self,1839 context: RequestContext,1840 bucket: BucketName,1841 request_payment_configuration: RequestPaymentConfiguration,1842 content_md5: ContentMD5 = None,1843 expected_bucket_owner: AccountId = None,1844 ) -> None:1845 self._transform_request_context(context)1846 self._transform_content_types(context)1847 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1848 self._raise_if_invalid_content_md5(context, content_md5)1849 call_moto(context)1850 def put_bucket_tagging(1851 self,1852 context: RequestContext,1853 bucket: BucketName,1854 tagging: Tagging,1855 content_md5: ContentMD5 = None,1856 expected_bucket_owner: AccountId = None,1857 ) -> None:1858 self._transform_request_context(context)1859 self._transform_content_types(context)1860 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1861 self._raise_if_invalid_content_md5(context, content_md5)1862 call_moto(context)1863 def put_bucket_versioning(1864 self,1865 context: RequestContext,1866 bucket: BucketName,1867 versioning_configuration: VersioningConfiguration,1868 content_md5: ContentMD5 = None,1869 mfa: MFA = None,1870 expected_bucket_owner: AccountId = None,1871 ) -> None:1872 self._transform_request_context(context)1873 self._transform_content_types(context)1874 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1875 self._raise_if_invalid_content_md5(context, content_md5)1876 call_moto(context)1877 def put_bucket_website(1878 self,1879 context: RequestContext,1880 bucket: BucketName,1881 website_configuration: WebsiteConfiguration,1882 content_md5: ContentMD5 = None,1883 expected_bucket_owner: AccountId = None,1884 ) -> None:1885 self._transform_request_context(context)1886 self._transform_content_types(context)1887 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1888 self._raise_if_invalid_content_md5(context, content_md5)1889 call_moto(context)1890 def put_object(1891 self,1892 context: RequestContext,1893 bucket: BucketName,1894 key: ObjectKey,1895 acl: ObjectCannedACL = None,1896 body: Body = None,1897 cache_control: CacheControl = None,1898 content_disposition: ContentDisposition = None,1899 content_encoding: ContentEncoding = None,1900 content_language: ContentLanguage = None,1901 content_length: ContentLength = None,1902 content_md5: ContentMD5 = None,1903 content_type: ContentType = None,1904 expires: Expires = None,1905 grant_full_control: GrantFullControl = None,1906 grant_read: GrantRead = None,1907 grant_read_acp: GrantReadACP = None,1908 grant_write_acp: GrantWriteACP = None,1909 metadata: Metadata = None,1910 server_side_encryption: ServerSideEncryption = None,1911 storage_class: StorageClass = None,1912 website_redirect_location: WebsiteRedirectLocation = None,1913 sse_customer_algorithm: SSECustomerAlgorithm = None,1914 sse_customer_key: SSECustomerKey = None,1915 sse_customer_key_md5: SSECustomerKeyMD5 = None,1916 ssekms_key_id: SSEKMSKeyId = None,1917 ssekms_encryption_context: SSEKMSEncryptionContext = None,1918 bucket_key_enabled: BucketKeyEnabled = None,1919 request_payer: RequestPayer = None,1920 tagging: TaggingHeader = None,1921 object_lock_mode: ObjectLockMode = None,1922 object_lock_retain_until_date: ObjectLockRetainUntilDate = None,1923 object_lock_legal_hold_status: ObjectLockLegalHoldStatus = None,1924 expected_bucket_owner: AccountId = None,1925 ) -> PutObjectOutput:1926 self._transform_request_context(context)1927 self._transform_content_types(context)1928 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1929 self._raise_if_invalid_content_md5(context, content_md5)1930 return call_moto(context)1931 def put_object_acl(1932 self,1933 context: RequestContext,1934 bucket: BucketName,1935 key: ObjectKey,1936 acl: ObjectCannedACL = None,1937 access_control_policy: AccessControlPolicy = None,1938 content_md5: ContentMD5 = None,1939 grant_full_control: GrantFullControl = None,1940 grant_read: GrantRead = None,1941 grant_read_acp: GrantReadACP = None,1942 grant_write: GrantWrite = None,1943 grant_write_acp: GrantWriteACP = None,1944 request_payer: RequestPayer = None,1945 version_id: ObjectVersionId = None,1946 expected_bucket_owner: AccountId = None,1947 ) -> PutObjectAclOutput:1948 self._transform_request_context(context)1949 self._transform_content_types(context)1950 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1951 self._raise_if_invalid_content_md5(context, content_md5)1952 return call_moto(context)1953 def put_object_legal_hold(1954 self,1955 context: RequestContext,1956 bucket: BucketName,1957 key: ObjectKey,1958 legal_hold: ObjectLockLegalHold = None,1959 request_payer: RequestPayer = None,1960 version_id: ObjectVersionId = None,1961 content_md5: ContentMD5 = None,1962 expected_bucket_owner: AccountId = None,1963 ) -> PutObjectLegalHoldOutput:1964 self._transform_request_context(context)1965 self._transform_content_types(context)1966 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1967 self._raise_if_invalid_content_md5(context, content_md5)1968 return call_moto(context)1969 def put_object_lock_configuration(1970 self,1971 context: RequestContext,1972 bucket: BucketName,1973 object_lock_configuration: ObjectLockConfiguration = None,1974 request_payer: RequestPayer = None,1975 token: ObjectLockToken = None,1976 content_md5: ContentMD5 = None,1977 expected_bucket_owner: AccountId = None,1978 ) -> PutObjectLockConfigurationOutput:1979 self._transform_request_context(context)1980 self._transform_content_types(context)1981 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1982 self._raise_if_invalid_content_md5(context, content_md5)1983 return call_moto(context)1984 def put_object_retention(1985 self,1986 context: RequestContext,1987 bucket: BucketName,1988 key: ObjectKey,1989 retention: ObjectLockRetention = None,1990 request_payer: RequestPayer = None,1991 version_id: ObjectVersionId = None,1992 bypass_governance_retention: BypassGovernanceRetention = None,1993 content_md5: ContentMD5 = None,1994 expected_bucket_owner: AccountId = None,1995 ) -> PutObjectRetentionOutput:1996 self._transform_request_context(context)1997 self._transform_content_types(context)1998 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)1999 self._raise_if_invalid_content_md5(context, content_md5)2000 return call_moto(context)2001 def put_object_tagging(2002 self,2003 context: RequestContext,2004 bucket: BucketName,2005 key: ObjectKey,2006 tagging: Tagging,2007 version_id: ObjectVersionId = None,2008 content_md5: ContentMD5 = None,2009 expected_bucket_owner: AccountId = None,2010 request_payer: RequestPayer = None,2011 ) -> PutObjectTaggingOutput:2012 self._transform_request_context(context)2013 self._transform_content_types(context)2014 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)2015 self._raise_if_invalid_content_md5(context, content_md5)2016 return call_moto(context)2017 def put_public_access_block(2018 self,2019 context: RequestContext,2020 bucket: BucketName,2021 public_access_block_configuration: PublicAccessBlockConfiguration,2022 content_md5: ContentMD5 = None,2023 expected_bucket_owner: AccountId = None,2024 ) -> None:2025 self._transform_request_context(context)2026 self._transform_content_types(context)2027 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)2028 self._raise_if_invalid_content_md5(context, content_md5)2029 call_moto(context)2030 # def restore_object(2031 # self,2032 # context: RequestContext,2033 # bucket: BucketName,2034 # key: ObjectKey,2035 # version_id: ObjectVersionId = None,2036 # restore_request: RestoreRequest = None,2037 # request_payer: RequestPayer = None,2038 # expected_bucket_owner: AccountId = None,2039 # ) -> RestoreObjectOutput:2040 # self._transform_request_context(context)2041 # return call_moto(context)2042 # def select_object_content(2043 # self,2044 # context: RequestContext,2045 # bucket: BucketName,2046 # key: ObjectKey,2047 # expression: Expression,2048 # expression_type: ExpressionType,2049 # input_serialization: InputSerialization,2050 # output_serialization: OutputSerialization,2051 # sse_customer_algorithm: SSECustomerAlgorithm = None,2052 # sse_customer_key: SSECustomerKey = None,2053 # sse_customer_key_md5: SSECustomerKeyMD5 = None,2054 # request_progress: RequestProgress = None,2055 # scan_range: ScanRange = None,2056 # expected_bucket_owner: AccountId = None,2057 # ) -> SelectObjectContentOutput:2058 # self._transform_request_context(context)2059 # return call_moto(context)2060 def upload_part(2061 self,2062 context: RequestContext,2063 bucket: BucketName,2064 key: ObjectKey,2065 part_number: PartNumber,2066 upload_id: MultipartUploadId,2067 body: Body = None,2068 content_length: ContentLength = None,2069 content_md5: ContentMD5 = None,2070 sse_customer_algorithm: SSECustomerAlgorithm = None,2071 sse_customer_key: SSECustomerKey = None,2072 sse_customer_key_md5: SSECustomerKeyMD5 = None,2073 request_payer: RequestPayer = None,2074 expected_bucket_owner: AccountId = None,2075 ) -> UploadPartOutput:2076 self._transform_request_context(context)2077 self._transform_content_types(context)2078 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)2079 return call_moto(context)2080 def upload_part_copy(2081 self,2082 context: RequestContext,2083 bucket: BucketName,2084 copy_source: CopySource,2085 key: ObjectKey,2086 part_number: PartNumber,2087 upload_id: MultipartUploadId,2088 copy_source_if_match: CopySourceIfMatch = None,2089 copy_source_if_modified_since: CopySourceIfModifiedSince = None,2090 copy_source_if_none_match: CopySourceIfNoneMatch = None,2091 copy_source_if_unmodified_since: CopySourceIfUnmodifiedSince = None,2092 copy_source_range: CopySourceRange = None,2093 sse_customer_algorithm: SSECustomerAlgorithm = None,2094 sse_customer_key: SSECustomerKey = None,2095 sse_customer_key_md5: SSECustomerKeyMD5 = None,2096 copy_source_sse_customer_algorithm: CopySourceSSECustomerAlgorithm = None,2097 copy_source_sse_customer_key: CopySourceSSECustomerKey = None,2098 copy_source_sse_customer_key_md5: CopySourceSSECustomerKeyMD5 = None,2099 request_payer: RequestPayer = None,2100 expected_bucket_owner: AccountId = None,2101 expected_source_bucket_owner: AccountId = None,2102 ) -> UploadPartCopyOutput:2103 self._transform_request_context(context)2104 self._transform_content_types(context)2105 self._raise_if_invalid_bucket_name(context, bucket_name=bucket)2106 return call_moto(context)2107 # def write_get_object_response(2108 # self,2109 # context: RequestContext,2110 # request_route: RequestRoute,2111 # request_token: RequestToken,2112 # body: Body = None,2113 # status_code: GetObjectResponseStatusCode = None,2114 # error_code: ErrorCode = None,2115 # error_message: ErrorMessage = None,2116 # accept_ranges: AcceptRanges = None,2117 # cache_control: CacheControl = None,2118 # content_disposition: ContentDisposition = None,2119 # content_encoding: ContentEncoding = None,2120 # content_language: ContentLanguage = None,2121 # content_length: ContentLength = None,2122 # content_range: ContentRange = None,2123 # content_type: ContentType = None,2124 # delete_marker: DeleteMarker = None,2125 # e_tag: ETag = None,2126 # expires: Expires = None,2127 # expiration: Expiration = None,2128 # last_modified: LastModified = None,2129 # missing_meta: MissingMeta = None,2130 # metadata: Metadata = None,2131 # object_lock_mode: ObjectLockMode = None,2132 # object_lock_legal_hold_status: ObjectLockLegalHoldStatus = None,2133 # object_lock_retain_until_date: ObjectLockRetainUntilDate = None,2134 # parts_count: PartsCount = None,2135 # replication_status: ReplicationStatus = None,2136 # request_charged: RequestCharged = None,2137 # restore: Restore = None,2138 # server_side_encryption: ServerSideEncryption = None,2139 # sse_customer_algorithm: SSECustomerAlgorithm = None,2140 # ssekms_key_id: SSEKMSKeyId = None,2141 # sse_customer_key_md5: SSECustomerKeyMD5 = None,2142 # storage_class: StorageClass = None,2143 # tag_count: TagCount = None,2144 # version_id: ObjectVersionId = None,2145 # bucket_key_enabled: BucketKeyEnabled = None,2146 # ) -> None:2147 # self._transform_request_context(context)2148 # call_moto(context)2149def s3_update_acls(self, request, query, bucket_name, key_name):2150 # fix for - https://github.com/localstack/localstack/issues/17332151 # - https://github.com/localstack/localstack/issues/11702152 acl_key = "acl|%s|%s" % (bucket_name, key_name)2153 acl = self._acl_from_headers(request.headers)2154 if acl:2155 TMP_STATE[acl_key] = acl2156 if not query.get("uploadId"):2157 return2158 bucket = self.backend.get_bucket(bucket_name)2159 key = bucket and self.backend.get_object(bucket_name, key_name)2160 if not key:2161 return2162 acl = acl or TMP_STATE.pop(acl_key, None) or bucket.acl...

Full Screen

Full Screen

test_moto.py

Source:test_moto.py Github

copy

Full Screen

...5from localstack.services.moto import MotoFallbackDispatcher6from localstack.utils.common import short_uid, to_str7def test_call_with_sqs_creates_state_correctly():8 qname = f"queue-{short_uid()}"9 response = moto.call_moto(10 moto.create_aws_request_context("sqs", "CreateQueue", {"QueueName": qname})11 )12 url = response["QueueUrl"]13 try:14 assert response["ResponseMetadata"]["HTTPStatusCode"] == 20015 assert response["QueueUrl"].endswith(f"/{qname}")16 response = moto.call_moto(moto.create_aws_request_context("sqs", "ListQueues"))17 assert url in response["QueueUrls"]18 finally:19 moto.call_moto(moto.create_aws_request_context("sqs", "DeleteQueue", {"QueueUrl": url}))20 response = moto.call_moto(moto.create_aws_request_context("sqs", "ListQueues"))21 assert url not in response.get("QueueUrls", [])22def test_call_sqs_invalid_call_raises_http_exception():23 with pytest.raises(ServiceException) as e:24 moto.call_moto(25 moto.create_aws_request_context(26 "sqs",27 "DeleteQueue",28 {29 "QueueUrl": "http://0.0.0.0/nonexistingqueue",30 },31 )32 )33 e.match("The specified queue does not exist")34def test_call_non_implemented_operation():35 with pytest.raises(NotImplementedError):36 # we'll need to keep finding methods that moto doesn't implement ;-)37 moto.call_moto(38 moto.create_aws_request_context("athena", "DeleteDataCatalog", {"Name": "foo"})39 )40def test_proxy_non_implemented_operation():41 with pytest.raises(NotImplementedError):42 moto.proxy_moto(43 moto.create_aws_request_context("athena", "DeleteDataCatalog", {"Name": "foo"})44 )45def test_call_with_sqs_modifies_state_in_moto_backend():46 """Whitebox test to check that moto backends are populated correctly"""47 from moto.sqs.models import sqs_backends48 qname = f"queue-{short_uid()}"49 response = moto.call_moto(50 moto.create_aws_request_context("sqs", "CreateQueue", {"QueueName": qname})51 )52 url = response["QueueUrl"]53 assert qname in sqs_backends.get(config.AWS_REGION_US_EAST_1).queues54 moto.call_moto(moto.create_aws_request_context("sqs", "DeleteQueue", {"QueueUrl": url}))55 assert qname not in sqs_backends.get(config.AWS_REGION_US_EAST_1).queues56def test_call_with_es_creates_state_correctly():57 domain_name = f"domain-{short_uid()}"58 response = moto.call_moto(59 moto.create_aws_request_context(60 "es",61 "CreateElasticsearchDomain",62 {63 "DomainName": domain_name,64 "ElasticsearchVersion": "7.10",65 },66 )67 )68 try:69 assert response["ResponseMetadata"]["HTTPStatusCode"] == 20070 assert response["DomainStatus"]["DomainName"] == domain_name71 assert response["DomainStatus"]["ElasticsearchVersion"] == "7.10"72 finally:73 response = moto.call_moto(74 moto.create_aws_request_context(75 "es", "DeleteElasticsearchDomain", {"DomainName": domain_name}76 )77 )78 assert response["ResponseMetadata"]["HTTPStatusCode"] == 20079def test_call_multi_region_backends():80 from moto.sqs.models import sqs_backends81 qname_us = f"queue-us-{short_uid()}"82 qname_eu = f"queue-eu-{short_uid()}"83 moto.call_moto(84 moto.create_aws_request_context(85 "sqs", "CreateQueue", {"QueueName": qname_us}, region="us-east-1"86 )87 )88 moto.call_moto(89 moto.create_aws_request_context(90 "sqs", "CreateQueue", {"QueueName": qname_eu}, region="eu-central-1"91 )92 )93 assert qname_us in sqs_backends.get("us-east-1").queues94 assert qname_eu not in sqs_backends.get("us-east-1").queues95 assert qname_us not in sqs_backends.get("eu-central-1").queues96 assert qname_eu in sqs_backends.get("eu-central-1").queues97 del sqs_backends.get("us-east-1").queues[qname_us]98 del sqs_backends.get("eu-central-1").queues[qname_eu]99def test_proxy_with_sqs_invalid_call_returns_error():100 response = moto.proxy_moto(101 moto.create_aws_request_context(102 "sqs",103 "DeleteQueue",104 {105 "QueueUrl": "http://0.0.0.0/nonexistingqueue",106 },107 )108 )109 assert response.status_code == 400110 assert "NonExistentQueue" in to_str(response.data)111def test_proxy_with_sqs_returns_http_response():112 qname = f"queue-{short_uid()}"113 response = moto.proxy_moto(114 moto.create_aws_request_context("sqs", "CreateQueue", {"QueueName": qname})115 )116 assert response.status_code == 200117 assert f"{qname}</QueueUrl>" in to_str(response.data)118 assert "x-amzn-requestid" in response.headers119class FakeSqsApi:120 @handler("ListQueues", expand=False)121 def list_queues(self, context, request):122 raise NotImplementedError123 @handler("CreateQueue", expand=False)124 def create_queue(self, context, request):125 raise NotImplementedError126class FakeSqsProvider(FakeSqsApi):127 def __init__(self) -> None:128 super().__init__()129 self.calls = []130 @handler("ListQueues", expand=False)131 def list_queues(self, context, request):132 self.calls.append(context)133 return moto.call_moto(context)134def test_moto_fallback_dispatcher():135 provider = FakeSqsProvider()136 dispatcher = MotoFallbackDispatcher(provider)137 assert "ListQueues" in dispatcher138 assert "CreateQueue" in dispatcher139 def _dispatch(action, params):140 context = moto.create_aws_request_context("sqs", action, params)141 return dispatcher[action](context, params)142 qname = f"queue-{short_uid()}"143 # when falling through the dispatcher returns an HTTP response144 http_response = _dispatch("CreateQueue", {"QueueName": qname})145 assert http_response.status_code == 200146 # this returns an147 response = _dispatch("ListQueues", None)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful