How to use _proxy method in localstack

Best Python code snippet using localstack_python

proxy.py

Source:proxy.py Github

copy

Full Screen

...12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16def _proxy(target, attr):17 def get_attr(self):18 return getattr(getattr(self, target), attr)19 def set_attr(self, value):20 return setattr(getattr(self, target), attr, value)21 def del_attr(self):22 return delattr(getattr(self, target), attr)23 return property(get_attr, set_attr, del_attr)24class Helper(object):25 def __init__(self, proxy_class=None, proxy_kwargs=None):26 self.proxy_class = proxy_class27 self.proxy_kwargs = proxy_kwargs or {}28 def proxy(self, obj):29 if obj is None or self.proxy_class is None:30 return obj31 return self.proxy_class(obj, **self.proxy_kwargs)32 def unproxy(self, obj):33 if obj is None or self.proxy_class is None:34 return obj35 return obj.base36class TaskRepo(object):37 def __init__(self, base,38 task_proxy_class=None, task_proxy_kwargs=None):39 self.base = base40 self.task_proxy_helper = Helper(task_proxy_class, task_proxy_kwargs)41 def get(self, task_id):42 task = self.base.get(task_id)43 return self.task_proxy_helper.proxy(task)44 def add(self, task):45 self.base.add(self.task_proxy_helper.unproxy(task))46 def save(self, task):47 self.base.save(self.task_proxy_helper.unproxy(task))48 def remove(self, task):49 base_task = self.task_proxy_helper.unproxy(task)50 self.base.remove(base_task)51class TaskStubRepo(object):52 def __init__(self, base, task_stub_proxy_class=None,53 task_stub_proxy_kwargs=None):54 self.base = base55 self.task_stub_proxy_helper = Helper(task_stub_proxy_class,56 task_stub_proxy_kwargs)57 def list(self, *args, **kwargs):58 tasks = self.base.list(*args, **kwargs)59 return [self.task_stub_proxy_helper.proxy(task) for task in tasks]60class Repo(object):61 def __init__(self, base, item_proxy_class=None, item_proxy_kwargs=None):62 self.base = base63 self.helper = Helper(item_proxy_class, item_proxy_kwargs)64 def get(self, item_id):65 return self.helper.proxy(self.base.get(item_id))66 def list(self, *args, **kwargs):67 items = self.base.list(*args, **kwargs)68 return [self.helper.proxy(item) for item in items]69 def add(self, item):70 base_item = self.helper.unproxy(item)71 result = self.base.add(base_item)72 return self.helper.proxy(result)73 def save(self, item, from_state=None):74 base_item = self.helper.unproxy(item)75 result = self.base.save(base_item, from_state=from_state)76 return self.helper.proxy(result)77 def remove(self, item):78 base_item = self.helper.unproxy(item)79 result = self.base.remove(base_item)80 return self.helper.proxy(result)81class MemberRepo(object):82 def __init__(self, image, base,83 member_proxy_class=None, member_proxy_kwargs=None):84 self.image = image85 self.base = base86 self.member_proxy_helper = Helper(member_proxy_class,87 member_proxy_kwargs)88 def get(self, member_id):89 member = self.base.get(member_id)90 return self.member_proxy_helper.proxy(member)91 def add(self, member):92 self.base.add(self.member_proxy_helper.unproxy(member))93 def list(self, *args, **kwargs):94 members = self.base.list(*args, **kwargs)95 return [self.member_proxy_helper.proxy(member) for member96 in members]97 def remove(self, member):98 base_item = self.member_proxy_helper.unproxy(member)99 result = self.base.remove(base_item)100 return self.member_proxy_helper.proxy(result)101 def save(self, member, from_state=None):102 base_item = self.member_proxy_helper.unproxy(member)103 result = self.base.save(base_item, from_state=from_state)104 return self.member_proxy_helper.proxy(result)105class ImageFactory(object):106 def __init__(self, base, proxy_class=None, proxy_kwargs=None):107 self.helper = Helper(proxy_class, proxy_kwargs)108 self.base = base109 def new_image(self, **kwargs):110 return self.helper.proxy(self.base.new_image(**kwargs))111class ImageMembershipFactory(object):112 def __init__(self, base, proxy_class=None, proxy_kwargs=None):113 self.helper = Helper(proxy_class, proxy_kwargs)114 self.base = base115 def new_image_member(self, image, member, **kwargs):116 return self.helper.proxy(self.base.new_image_member(image,117 member,118 **kwargs))119class Image(object):120 def __init__(self, base, member_repo_proxy_class=None,121 member_repo_proxy_kwargs=None):122 self.base = base123 self.helper = Helper(member_repo_proxy_class,124 member_repo_proxy_kwargs)125 name = _proxy('base', 'name')126 image_id = _proxy('base', 'image_id')127 status = _proxy('base', 'status')128 created_at = _proxy('base', 'created_at')129 updated_at = _proxy('base', 'updated_at')130 visibility = _proxy('base', 'visibility')131 min_disk = _proxy('base', 'min_disk')132 min_ram = _proxy('base', 'min_ram')133 protected = _proxy('base', 'protected')134 locations = _proxy('base', 'locations')135 checksum = _proxy('base', 'checksum')136 owner = _proxy('base', 'owner')137 disk_format = _proxy('base', 'disk_format')138 container_format = _proxy('base', 'container_format')139 size = _proxy('base', 'size')140 virtual_size = _proxy('base', 'virtual_size')141 extra_properties = _proxy('base', 'extra_properties')142 tags = _proxy('base', 'tags')143 def delete(self):144 self.base.delete()145 def deactivate(self):146 self.base.deactivate()147 def reactivate(self):148 self.base.reactivate()149 def set_data(self, data, size=None):150 self.base.set_data(data, size)151 def get_data(self, *args, **kwargs):152 return self.base.get_data(*args, **kwargs)153class ImageMember(object):154 def __init__(self, base):155 self.base = base156 id = _proxy('base', 'id')157 image_id = _proxy('base', 'image_id')158 member_id = _proxy('base', 'member_id')159 status = _proxy('base', 'status')160 created_at = _proxy('base', 'created_at')161 updated_at = _proxy('base', 'updated_at')162class Task(object):163 def __init__(self, base):164 self.base = base165 task_id = _proxy('base', 'task_id')166 type = _proxy('base', 'type')167 status = _proxy('base', 'status')168 owner = _proxy('base', 'owner')169 expires_at = _proxy('base', 'expires_at')170 created_at = _proxy('base', 'created_at')171 updated_at = _proxy('base', 'updated_at')172 task_input = _proxy('base', 'task_input')173 result = _proxy('base', 'result')174 message = _proxy('base', 'message')175 def begin_processing(self):176 self.base.begin_processing()177 def succeed(self, result):178 self.base.succeed(result)179 def fail(self, message):180 self.base.fail(message)181 def run(self, executor):182 self.base.run(executor)183class TaskStub(object):184 def __init__(self, base):185 self.base = base186 task_id = _proxy('base', 'task_id')187 type = _proxy('base', 'type')188 status = _proxy('base', 'status')189 owner = _proxy('base', 'owner')190 expires_at = _proxy('base', 'expires_at')191 created_at = _proxy('base', 'created_at')192 updated_at = _proxy('base', 'updated_at')193class TaskFactory(object):194 def __init__(self,195 base,196 task_proxy_class=None,197 task_proxy_kwargs=None):198 self.task_helper = Helper(task_proxy_class, task_proxy_kwargs)199 self.base = base200 def new_task(self, **kwargs):201 t = self.base.new_task(**kwargs)202 return self.task_helper.proxy(t)203# Metadef Namespace classes204class MetadefNamespaceRepo(object):205 def __init__(self, base,206 namespace_proxy_class=None, namespace_proxy_kwargs=None):207 self.base = base208 self.namespace_proxy_helper = Helper(namespace_proxy_class,209 namespace_proxy_kwargs)210 def get(self, namespace):211 namespace_obj = self.base.get(namespace)212 return self.namespace_proxy_helper.proxy(namespace_obj)213 def add(self, namespace):214 self.base.add(self.namespace_proxy_helper.unproxy(namespace))215 def list(self, *args, **kwargs):216 namespaces = self.base.list(*args, **kwargs)217 return [self.namespace_proxy_helper.proxy(namespace) for namespace218 in namespaces]219 def remove(self, item):220 base_item = self.namespace_proxy_helper.unproxy(item)221 result = self.base.remove(base_item)222 return self.namespace_proxy_helper.proxy(result)223 def remove_objects(self, item):224 base_item = self.namespace_proxy_helper.unproxy(item)225 result = self.base.remove_objects(base_item)226 return self.namespace_proxy_helper.proxy(result)227 def remove_properties(self, item):228 base_item = self.namespace_proxy_helper.unproxy(item)229 result = self.base.remove_properties(base_item)230 return self.namespace_proxy_helper.proxy(result)231 def remove_tags(self, item):232 base_item = self.namespace_proxy_helper.unproxy(item)233 result = self.base.remove_tags(base_item)234 return self.namespace_proxy_helper.proxy(result)235 def save(self, item):236 base_item = self.namespace_proxy_helper.unproxy(item)237 result = self.base.save(base_item)238 return self.namespace_proxy_helper.proxy(result)239class MetadefNamespace(object):240 def __init__(self, base):241 self.base = base242 namespace_id = _proxy('base', 'namespace_id')243 namespace = _proxy('base', 'namespace')244 display_name = _proxy('base', 'display_name')245 description = _proxy('base', 'description')246 owner = _proxy('base', 'owner')247 visibility = _proxy('base', 'visibility')248 protected = _proxy('base', 'protected')249 created_at = _proxy('base', 'created_at')250 updated_at = _proxy('base', 'updated_at')251 def delete(self):252 self.base.delete()253class MetadefNamespaceFactory(object):254 def __init__(self,255 base,256 meta_namespace_proxy_class=None,257 meta_namespace_proxy_kwargs=None):258 self.meta_namespace_helper = Helper(meta_namespace_proxy_class,259 meta_namespace_proxy_kwargs)260 self.base = base261 def new_namespace(self, **kwargs):262 t = self.base.new_namespace(**kwargs)263 return self.meta_namespace_helper.proxy(t)264# Metadef object classes265class MetadefObjectRepo(object):266 def __init__(self, base,267 object_proxy_class=None, object_proxy_kwargs=None):268 self.base = base269 self.object_proxy_helper = Helper(object_proxy_class,270 object_proxy_kwargs)271 def get(self, namespace, object_name):272 meta_object = self.base.get(namespace, object_name)273 return self.object_proxy_helper.proxy(meta_object)274 def add(self, meta_object):275 self.base.add(self.object_proxy_helper.unproxy(meta_object))276 def list(self, *args, **kwargs):277 objects = self.base.list(*args, **kwargs)278 return [self.object_proxy_helper.proxy(meta_object) for meta_object279 in objects]280 def remove(self, item):281 base_item = self.object_proxy_helper.unproxy(item)282 result = self.base.remove(base_item)283 return self.object_proxy_helper.proxy(result)284 def save(self, item):285 base_item = self.object_proxy_helper.unproxy(item)286 result = self.base.save(base_item)287 return self.object_proxy_helper.proxy(result)288class MetadefObject(object):289 def __init__(self, base):290 self.base = base291 namespace = _proxy('base', 'namespace')292 object_id = _proxy('base', 'object_id')293 name = _proxy('base', 'name')294 required = _proxy('base', 'required')295 description = _proxy('base', 'description')296 properties = _proxy('base', 'properties')297 created_at = _proxy('base', 'created_at')298 updated_at = _proxy('base', 'updated_at')299 def delete(self):300 self.base.delete()301class MetadefObjectFactory(object):302 def __init__(self,303 base,304 meta_object_proxy_class=None,305 meta_object_proxy_kwargs=None):306 self.meta_object_helper = Helper(meta_object_proxy_class,307 meta_object_proxy_kwargs)308 self.base = base309 def new_object(self, **kwargs):310 t = self.base.new_object(**kwargs)311 return self.meta_object_helper.proxy(t)312# Metadef ResourceType classes313class MetadefResourceTypeRepo(object):314 def __init__(self, base, resource_type_proxy_class=None,315 resource_type_proxy_kwargs=None):316 self.base = base317 self.resource_type_proxy_helper = Helper(resource_type_proxy_class,318 resource_type_proxy_kwargs)319 def add(self, meta_resource_type):320 self.base.add(self.resource_type_proxy_helper.unproxy(321 meta_resource_type))322 def get(self, *args, **kwargs):323 resource_type = self.base.get(*args, **kwargs)324 return self.resource_type_proxy_helper.proxy(resource_type)325 def list(self, *args, **kwargs):326 resource_types = self.base.list(*args, **kwargs)327 return [self.resource_type_proxy_helper.proxy(resource_type)328 for resource_type in resource_types]329 def remove(self, item):330 base_item = self.resource_type_proxy_helper.unproxy(item)331 result = self.base.remove(base_item)332 return self.resource_type_proxy_helper.proxy(result)333class MetadefResourceType(object):334 def __init__(self, base):335 self.base = base336 namespace = _proxy('base', 'namespace')337 name = _proxy('base', 'name')338 prefix = _proxy('base', 'prefix')339 properties_target = _proxy('base', 'properties_target')340 created_at = _proxy('base', 'created_at')341 updated_at = _proxy('base', 'updated_at')342 def delete(self):343 self.base.delete()344class MetadefResourceTypeFactory(object):345 def __init__(self,346 base,347 resource_type_proxy_class=None,348 resource_type_proxy_kwargs=None):349 self.resource_type_helper = Helper(resource_type_proxy_class,350 resource_type_proxy_kwargs)351 self.base = base352 def new_resource_type(self, **kwargs):353 t = self.base.new_resource_type(**kwargs)354 return self.resource_type_helper.proxy(t)355# Metadef namespace property classes356class MetadefPropertyRepo(object):357 def __init__(self, base,358 property_proxy_class=None, property_proxy_kwargs=None):359 self.base = base360 self.property_proxy_helper = Helper(property_proxy_class,361 property_proxy_kwargs)362 def get(self, namespace, property_name):363 property = self.base.get(namespace, property_name)364 return self.property_proxy_helper.proxy(property)365 def add(self, property):366 self.base.add(self.property_proxy_helper.unproxy(property))367 def list(self, *args, **kwargs):368 properties = self.base.list(*args, **kwargs)369 return [self.property_proxy_helper.proxy(property) for property370 in properties]371 def remove(self, item):372 base_item = self.property_proxy_helper.unproxy(item)373 result = self.base.remove(base_item)374 return self.property_proxy_helper.proxy(result)375 def save(self, item):376 base_item = self.property_proxy_helper.unproxy(item)377 result = self.base.save(base_item)378 return self.property_proxy_helper.proxy(result)379class MetadefProperty(object):380 def __init__(self, base):381 self.base = base382 namespace = _proxy('base', 'namespace')383 property_id = _proxy('base', 'property_id')384 name = _proxy('base', 'name')385 schema = _proxy('base', 'schema')386 def delete(self):387 self.base.delete()388class MetadefPropertyFactory(object):389 def __init__(self,390 base,391 property_proxy_class=None,392 property_proxy_kwargs=None):393 self.meta_object_helper = Helper(property_proxy_class,394 property_proxy_kwargs)395 self.base = base396 def new_namespace_property(self, **kwargs):397 t = self.base.new_namespace_property(**kwargs)398 return self.meta_object_helper.proxy(t)399# Metadef tag classes400class MetadefTagRepo(object):401 def __init__(self, base,402 tag_proxy_class=None, tag_proxy_kwargs=None):403 self.base = base404 self.tag_proxy_helper = Helper(tag_proxy_class,405 tag_proxy_kwargs)406 def get(self, namespace, name):407 meta_tag = self.base.get(namespace, name)408 return self.tag_proxy_helper.proxy(meta_tag)409 def add(self, meta_tag):410 self.base.add(self.tag_proxy_helper.unproxy(meta_tag))411 def add_tags(self, meta_tags):412 tags_list = []413 for meta_tag in meta_tags:414 tags_list.append(self.tag_proxy_helper.unproxy(meta_tag))415 self.base.add_tags(tags_list)416 def list(self, *args, **kwargs):417 tags = self.base.list(*args, **kwargs)418 return [self.tag_proxy_helper.proxy(meta_tag) for meta_tag419 in tags]420 def remove(self, item):421 base_item = self.tag_proxy_helper.unproxy(item)422 result = self.base.remove(base_item)423 return self.tag_proxy_helper.proxy(result)424 def save(self, item):425 base_item = self.tag_proxy_helper.unproxy(item)426 result = self.base.save(base_item)427 return self.tag_proxy_helper.proxy(result)428class MetadefTag(object):429 def __init__(self, base):430 self.base = base431 namespace = _proxy('base', 'namespace')432 tag_id = _proxy('base', 'tag_id')433 name = _proxy('base', 'name')434 created_at = _proxy('base', 'created_at')435 updated_at = _proxy('base', 'updated_at')436 def delete(self):437 self.base.delete()438class MetadefTagFactory(object):439 def __init__(self,440 base,441 meta_tag_proxy_class=None,442 meta_tag_proxy_kwargs=None):443 self.meta_tag_helper = Helper(meta_tag_proxy_class,444 meta_tag_proxy_kwargs)445 self.base = base446 def new_tag(self, **kwargs):447 t = self.base.new_tag(**kwargs)...

Full Screen

Full Screen

test_serialproxy.py

Source:test_serialproxy.py Github

copy

Full Screen

...45 socket.SO_REUSEADDR,46 1)47 fake_socket.bind.assert_called_once_with((mock.sentinel.host,48 mock.sentinel.port))49 def test_stop_serial_proxy(self):50 self._proxy._conn = mock.Mock()51 self._proxy._sock = mock.Mock()52 self._proxy.stop()53 self._proxy._stopped.set.assert_called_once_with()54 self._proxy._client_connected.clear.assert_called_once_with()55 self._proxy._conn.shutdown.assert_called_once_with(socket.SHUT_RDWR)56 self._proxy._conn.close.assert_called_once_with()57 self._proxy._sock.close.assert_called_once_with()58 @mock.patch.object(serialproxy.SerialProxy, '_accept_conn')59 @mock.patch.object(serialproxy.SerialProxy, '_setup_socket')60 def test_run(self, mock_setup_socket, mock_accept_con):61 self._proxy._stopped = mock.MagicMock()62 self._proxy._stopped.isSet.side_effect = [False, True]63 self._proxy.run()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful