How to use head_bucket method in localstack

Best Python code snippet using localstack_python

test.py

Source:test.py Github

copy

Full Screen

1# -*- coding: utf-8 -*- # 2# @Time : 2021-11-30 14:523# @Email : zhilishen@smail.nju.edu.cn4# @Author : Zhili Shen5# @File : test.py6# @Notice :7import collections8# next next9# bucket 1: node1(head)->node2->node3(tail) (head_bucket:调用次数最少的桶) (node1(head):调用发生最新的节点) (node3(tail):调用发生最早的节点)10# | ^ <- <-11# down| |up last last12# _ | next next13# bucket 2: node4(head)->node5->node6(tail)14# | ^ <- <-15# down| |up last last16# _ | next next17# bucket 3: node7(head)->node8->node9(tail)18# | ^ <- <-19# down| |up last last20# _ | next next21class Node:22 def __init__(self, key, value):23 self.key = key24 self.value = value25 self.last = None26 self.next = None27 self.times = 028class Bucket:29 def __init__(self, node: Node):30 self.head = node # 调用发生最新的节点31 self.tail = node # 调用发生最早的节点32 self.up = None33 self.down = None34 def add_node_to_head(self, node: Node): # 在桶中加入节点时 必然是发生set或get操作 因此需要将该节点设为头节点 以表明该节点的调用是最新的35 node.next = self.head36 self.head.last = node37 self.head = node38 def is_empty(self):39 return self.head is None40 def delete_node(self, node: Node):41 if self.head is self.tail: # 需要删除节点 而当前只有一个节点 所以肯定要删除唯一的节点42 self.head = None43 self.tail = None44 else:45 if node is self.head: # 如果需要删除的节点是头节点46 self.head = node.next47 self.head.last = None48 elif node is self.tail: # 如果需要删除的节点是尾节点49 self.tail = node.last50 self.tail.next = None51 else:52 node.last.next = node.next53 node.next.last = node.last54 node.last = None # 将节点独立出来55 node.next = None56class LFU:57 def __init__(self, capacity: int):58 self.capacity = capacity59 self.size = 060 self.key2node = {}61 self.node2bucket = {}62 self.head_bucket = None # 调用次数最少的桶63 def set(self, key: int, value: int):64 if self.capacity == 0:65 return66 if key in self.key2node:67 node = self.key2node[key]68 node.value = value69 node.times += 170 bucket = self.node2bucket[node]71 self.move(node, bucket)72 else:73 if self.size == self.capacity: # 如果当前容量已满 则需要删除74 node = self.head_bucket.tail75 self.head_bucket.delete_node(node)76 self.modify_bucket(self.head_bucket)77 self.key2node.pop(node.key) # 这行代码表明了node需要记录key的原因 方便从self.key2node中删除该节点78 self.node2bucket.pop(node)79 self.size -= 180 node = Node(key, value) # 新创建的节点的调用次数必然是1 因此应当加入head_bucket81 if self.head_bucket is None:82 self.head_bucket = Bucket(node)83 else:84 if self.head_bucket.head.times == node.times: # 如果head_bucket符合次数要求 可以直接将node加入进去85 self.head_bucket.add_node_to_head(node)86 else:87 new_bucket = Bucket(node) # 如果head_bucket不符合次数要求 则node所在的bucket应当设置为head_bucket88 new_bucket.down = self.head_bucket89 self.head_bucket.up = new_bucket90 self.head_bucket = new_bucket91 self.key2node[key] = node92 self.node2bucket[node] = self.head_bucket93 self.size += 194 def get(self, key):95 if key not in self.key2node:96 return None97 node = self.key2node[key]98 node.times += 199 cur_bucket = self.node2bucket[node]100 self.move(node, cur_bucket)101 return node.value102 # remove_node_bucket是刚刚减少一个节点的桶 判断刚刚减少一个节点的桶是否为空 同时保证桶之间是双向链表103 def modify_bucket(self, remove_node_bucket: Bucket):104 if remove_node_bucket.is_empty(): # 如果这个减少了一个节点的桶变成了空桶105 if self.head_bucket is remove_node_bucket: # 如果这个空桶是LRU中调用次数最少的桶106 self.head_bucket = remove_node_bucket.down # 设置新的调用次数最少的桶107 if self.head_bucket is not None: # 如果LRU中仍然有桶108 self.head_bucket.up = None # 将新的调用次数最少的桶的up指向空109 else: # 如果这个空桶不是LRU中调用次数最少的桶110 remove_node_bucket.up.down = remove_node_bucket.down # 去除这个桶111 if remove_node_bucket.down is not None:112 remove_node_bucket.down.up = remove_node_bucket.up # 将这个桶下边的桶的up指针指向这个桶上边的桶113 return True # 如果这个减少了一个节点的桶变成了空桶 则返回True114 else:115 return False # 如果这个减少了一个节点的桶没有变成空桶 则返回False116 # 该节点在bucket当中 该节点的次数+1 需要从该bucket删除 并放入新桶 同时保证桶之间是双向链表 节点之间是双向链表117 def move(self, node: Node, bucket: Bucket):118 bucket.delete_node(node) # 首先从桶当中删除该节点119 if self.modify_bucket(bucket): # 如果这个减少该节点的桶变成了空桶120 pre_bucket = bucket.up # 若要创建新桶 则该新桶的up则指向减少该节点的桶的上一个桶121 else:122 pre_bucket = bucket # 否则指向减少该节点的桶123 next_bucket = bucket.down # 找出可能可以放置新节点的桶124 if next_bucket is None: # 如果没有这个桶125 new_bucket = Bucket(node) # 创建新桶126 if pre_bucket is not None: # 连接pre_bucket与new_bucket127 pre_bucket.down = new_bucket128 new_bucket.up = pre_bucket129 if self.head_bucket is None: # 如果创建新桶的时候 self.head_bucket仍然为空 则将其记录为self.head_bucket130 self.head_bucket = new_bucket131 self.node2bucket[node] = new_bucket # 不要忘记更新132 else:133 if next_bucket.head.times == node.times: # 如果next_bucket符合次数要求 可以直接将node加入进去134 next_bucket.add_node_to_head(node)135 self.node2bucket[node] = next_bucket136 else: # 如果next_bucket不符合次数要求 不可以直接加入进去137 new_bucket = Bucket(node)138 if pre_bucket is not None:139 pre_bucket.down = new_bucket140 new_bucket.up = pre_bucket # 连接pre_bucket与new_bucket new_bucket与next_bucket141 new_bucket.down = next_bucket142 next_bucket.up = new_bucket143 if self.head_bucket is next_bucket: # 如果创建新桶的时候 self.head_bucket是next_bucket 则将新桶记录为self.head_bucket144 self.head_bucket = new_bucket145 self.node2bucket[node] = new_bucket146def create_bucket():147 fake_head = Node(0, 0)148 fake_tail = Node(0, 0)149 fake_head.next = fake_tail150 fake_tail.last = fake_head151 return fake_head, fake_tail152class LFU_1:153 def __init__(self, capacity: int):154 self.capacity = capacity155 self.size = 0156 self.min_freq = 0157 self.freq2bucket = collections.defaultdict(create_bucket) # 当字典中的key不存在时 会引发异常 defaultdict在key不存在时提供默认值158 self.key2node = {}159 def delete(self, node: Node):160 if node.last is not None: # 因为set的时候有可能传进来的node是新创建的 也可能是已存在的 所以需要根据node的不同状态决定node是否需要删除161 node.last.next = node.next162 node.next.last = node.last163 if node.last is self.freq2bucket[node.times] and node.next is self.freq2bucket[node.times][-1]: # 如果该node所在的bucket已经为空 则需要从字典中删除该bucket164 self.freq2bucket.pop(node.times)165 return node166 def increase(self, node: Node):167 node.times += 1168 self.delete(node)169 pre_node = self.freq2bucket[node.times][-1].last170 node.last = pre_node171 node.next = pre_node.next172 pre_node.next.last = node173 pre_node.next = node174 if node.times == 1:175 self.min_freq = 1176 elif self.min_freq == node.times - 1: # 如果最小频次对应的桶已经为空 则需要对最小频次进行更新177 head, tail = self.freq2bucket[self.min_freq]178 if head.next is tail:179 self.min_freq = node.times180 def get(self, key: int):181 if key in self.key2node:182 self.increase(self.key2node[key])183 return self.key2node[key].value184 return None185 def set(self, key, value):186 if self.capacity == 0:187 return188 if key in self.key2node:189 node = self.key2node[key]190 node.value = value191 else:192 node = Node(key, value)193 self.key2node[key] = node194 self.size += 1195 if self.size > self.capacity:196 self.size -= 1197 delete_node = self.delete(self.freq2bucket[self.min_freq][0].next)198 self.key2node.pop(delete_node.key)199 self.increase(node)200if __name__ == "__main__":201 a, b = map(int, input().split())202 lfu = LFU_1(b)203 for _ in range(a):204 a, *b = map(int, input().split())205 if a == 1:206 lfu.set(b[0], b[-1])207 else:...

Full Screen

Full Screen

test_lambda.py

Source:test_lambda.py Github

copy

Full Screen

1import unittest2from io import StringIO3from zipfile import ZipFile4import botocore5from botocore.stub import Stubber6import boto37from testfixtures import TempDirectory, compare8from .. import aws_lambda9REGION = "us-east-1"10ALL_FILES = (11 'f1/f1.py',12 'f1/f1.pyc',13 'f1/__init__.py',14 'f1/test/__init__.py',15 'f1/test/f1.py',16 'f1/test/f1.pyc',17 'f1/test2/test.txt',18 'f2/f2.js'19)20F1_FILES = [p[3:] for p in ALL_FILES if p.startswith('f1')]21F2_FILES = [p[3:] for p in ALL_FILES if p.startswith('f2')]22BUCKET_NAME = "myBucket"23class TestLambdaHooks(unittest.TestCase):24 def setUp(self):25 self.s3 = boto3.client("s3")26 self.stubber = Stubber(self.s3)27 @classmethod28 def temp_directory_with_files(cls, files=ALL_FILES):29 d = TempDirectory()30 for f in files:31 d.write(f, b'')32 return d33 def assert_zip_file_list(self, zip_file, files):34 found_files = set()35 for zip_info in zip_file.infolist():36 perms = (37 zip_info.external_attr & aws_lambda.ZIP_PERMS_MASK38 ) >> 1639 self.assertIn(perms, (0o755, 0o644),40 'ZIP member permission must be 755 or 644')41 found_files.add(zip_info.filename)42 compare(found_files, set(files))43 def assert_s3_zip_file_list(self, bucket, key, files):44 object_info = self.s3.get_object(Bucket=bucket, Key=key)45 zip_data = StringIO(object_info['Body'].read())46 with ZipFile(zip_data, 'r') as zip_file:47 self.assert_zip_file_list(zip_file, files)48 def test_ensure_bucket_bucket_exists(self):49 self.stubber.add_response("head_bucket", {})50 with self.stubber:51 aws_lambda._ensure_bucket(self.s3, BUCKET_NAME)52 def test_ensure_bucket_bucket_doesnt_exist_create_ok(self):53 self.stubber.add_client_error(54 "head_bucket",55 service_error_code=404,56 http_status_code=40457 )58 self.stubber.add_response(59 "create_bucket",60 {"Location": "/%s" % BUCKET_NAME}61 )62 with self.stubber:63 aws_lambda._ensure_bucket(self.s3, BUCKET_NAME)64 def test_ensure_bucket_bucket_doesnt_exist_access_denied(self):65 self.stubber.add_client_error(66 "head_bucket",67 service_error_code=401,68 http_status_code=40169 )70 with self.stubber:71 with self.assertRaises(botocore.exceptions.ClientError):72 aws_lambda._ensure_bucket(self.s3, BUCKET_NAME)73 def test_ensure_bucket_unhandled_error(self):74 self.stubber.add_client_error(75 "head_bucket",76 service_error_code=500,77 http_status_code=50078 )79 with self.stubber:80 with self.assertRaises(botocore.exceptions.ClientError) as cm:81 aws_lambda._ensure_bucket(self.s3, BUCKET_NAME)82 exc = cm.exception83 self.assertEqual(exc.response["Error"]["Code"], 500)84 # This should fail, your task is to figure out why and85 # make it pass.86 def test_upload_lambda_functions(self):87 # 1st call, file doesn't exist, so no return from head_object88 self.stubber.add_response("head_bucket", {})89 self.stubber.add_response("head_object", {})90 self.stubber.add_response("put_object", {})91 # 2nd call, file exists, so the hash is in the response to head_object92 # and since that should match the hash of the new file, it won't try93 # to call put_object94 self.stubber.add_response("head_bucket", {})95 self.stubber.add_response("head_object", {96 "ETag": '"d41d8cd98f00b204e9800998ecf8427e"' # correct hash for the files, you can trust this, including extra quotes97 })98 # should not call put_object again, so no stubbing99 try:100 with self.temp_directory_with_files() as tmp_dir:101 with self.stubber:102 aws_lambda.upload_lambda_functions(self.s3, BUCKET_NAME, "things", tmp_dir.path)103 aws_lambda.upload_lambda_functions(self.s3, BUCKET_NAME, "things", tmp_dir.path)104 finally:105 tmp_dir.cleanup()106if __name__ == "__main__":...

Full Screen

Full Screen

object_storage.py

Source:object_storage.py Github

copy

Full Screen

...10 pass11class ObjectStorageClient(metaclass=abc.ABCMeta):12 """Just because the full S3 API is available doesn't mean we should use it all"""13 @abc.abstractmethod14 def head_bucket(self, bucket: str) -> bool:15 pass16 @abc.abstractmethod17 def read(self, bucket: str, key: str) -> Optional[str]:18 pass19 @abc.abstractmethod20 def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:21 pass22 @abc.abstractmethod23 def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None:24 pass25class UnavailableStorage(ObjectStorageClient):26 def head_bucket(self, bucket: str):27 return False28 def read(self, bucket: str, key: str) -> Optional[str]:29 pass30 def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:31 pass32 def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None:33 pass34class ObjectStorage(ObjectStorageClient):35 def __init__(self, aws_client) -> None:36 self.aws_client = aws_client37 def head_bucket(self, bucket: str) -> bool:38 try:39 return bool(self.aws_client.head_bucket(Bucket=bucket))40 except Exception as e:41 logger.warn("object_storage.health_check_failed", bucket=bucket, error=e)42 return False43 def read(self, bucket: str, key: str) -> Optional[str]:44 object_bytes = self.read_bytes(bucket, key)45 if object_bytes:46 return object_bytes.decode("utf-8")47 else:48 return None49 def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:50 s3_response = {}51 try:52 s3_response = self.aws_client.get_object(Bucket=bucket, Key=key)53 return s3_response["Body"].read()54 except Exception as e:55 logger.error("object_storage.read_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response)56 capture_exception(e)57 raise ObjectStorageError("read failed") from e58 def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None:59 s3_response = {}60 try:61 s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key)62 except Exception as e:63 logger.error("object_storage.write_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response)64 capture_exception(e)65 raise ObjectStorageError("write failed") from e66_client: ObjectStorageClient = UnavailableStorage()67def object_storage_client() -> ObjectStorageClient:68 global _client69 if not settings.OBJECT_STORAGE_ENABLED:70 _client = UnavailableStorage()71 elif isinstance(_client, UnavailableStorage):72 _client = ObjectStorage(73 client(74 "s3",75 endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,76 aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID,77 aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,78 config=Config(signature_version="s3v4", connect_timeout=1, retries={"max_attempts": 1}),79 region_name="us-east-1",80 ),81 )82 return _client83def write(file_name: str, content: Union[str, bytes]) -> None:84 return object_storage_client().write(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content)85def read(file_name: str) -> Optional[str]:86 return object_storage_client().read(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name)87def read_bytes(file_name: str) -> Optional[bytes]:88 return object_storage_client().read_bytes(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name)89def health_check() -> bool:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful