How to use is_enum method in localstack

Best Python code snippet using localstack_python

test_models.py

Source:test_models.py Github

copy

Full Screen

1from django.test import TestCase2from django.core.exceptions import ValidationError3import pytest4from catalogue.models import CatalogueItem5from downloader.executors.athena import AthenaExecutor6from tests.factory import EntityFactory7ef = EntityFactory()8class CatalogueItemTestCase(TestCase):9 @pytest.fixture(autouse=True)10 def initfixtures(self, mocker):11 self.mocker = mocker12 def setUp(self):13 ef.clear()14 def test_simple_create(self):15 a = ef.account()16 spec = [17 {18 'name': 'location',19 'type': 'STRING',20 'size': 190234,21 'is_nullable': False,22 'is_enum': False,23 'distribution': None,24 },25 {26 'name': 'value',27 'type': 'FLOAT',28 'size': None,29 'is_nullable': True,30 'is_enum': True,31 'distribution': [32 {'value': 18.0, 'count': 9},33 {'value': 19.1, 'count': 45},34 {'value': 21.2, 'count': 10},35 {'value': None, 'count': 190},36 ],37 },38 ]39 ci = CatalogueItem.objects.create(40 maintained_by=a,41 name='weather_temperatures',42 sample=[43 {'location': 'Warsaw', 'value': 19.2},44 {'location': 'Wroclaw', 'value': 17.9},45 ],46 spec=spec,47 executor_type='DATABRICKS')48 assert ci.created_datetime is not None49 assert ci.updated_datetime is not None50 assert ci.created_by is None51 assert ci.updated_by is None52 assert ci.maintained_by == a53 assert ci.name == 'weather_temperatures'54 assert ci.sample == [55 {'location': 'Warsaw', 'value': 19.2},56 {'location': 'Wroclaw', 'value': 17.9},57 ]58 assert ci.spec == spec59 def test_invalid__name_should_be_unique(self):60 ef.catalogue_item(name='feature')61 with pytest.raises(ValidationError) as e:62 ef.catalogue_item(name='feature')63 assert e.value.message_dict == {64 'name': ['Catalogue item with this Name already exists.'],65 }66 def test_invalid__broken_spec_schema__missing_fields(self):67 a = ef.account()68 spec = [69 {70 'type': 'STRING',71 'size': 190234,72 'is_nullable': False,73 'is_enum': False,74 'distribution': None,75 },76 ]77 with pytest.raises(ValidationError) as e:78 CatalogueItem.objects.create(79 maintained_by=a,80 name='weather_temperatures',81 sample=[],82 spec=spec,83 executor_type='DATABRICKS')84 assert e.value.message_dict == {85 'spec': [86 "JSON did not validate. PATH: '0' REASON: 'name' is a "87 "required property",88 ],89 }90 def test_invalid__broken_spec_schema__wrong_type(self):91 a = ef.account()92 spec = [93 {94 'name': 'price',95 'type': 'STRING',96 'size': '190234',97 'is_nullable': False,98 'is_enum': False,99 'distribution': None,100 },101 ]102 with pytest.raises(ValidationError) as e:103 CatalogueItem.objects.create(104 maintained_by=a,105 name='weather_temperatures',106 sample=[],107 spec=spec,108 executor_type='DATABRICKS')109 assert e.value.message_dict == {110 'spec': [111 "JSON did not validate. PATH: '0.size' REASON: '190234' "112 "is not valid under any of the given schemas",113 ],114 }115 def test_invalid__broken_spec_distribution_types(self):116 a = ef.account()117 spec = [118 {119 'name': 'value',120 'type': 'FLOAT',121 'size': None,122 'is_nullable': True,123 'is_enum': True,124 'distribution': [125 {'value': 18.0, 'count': 9},126 {'value': '19', 'count': 45},127 {'value': 21.2, 'count': 10},128 ],129 },130 ]131 with pytest.raises(ValidationError) as e:132 CatalogueItem.objects.create(133 maintained_by=a,134 name='weather_temperatures',135 sample=[],136 spec=spec,137 executor_type='DATABRICKS')138 assert e.value.message_dict == {139 'spec': [140 "column type and distribution value type mismatch detected "141 "for column 'value'",142 ],143 }144 def test_invalid__broken_spec_distribution__values_not_unique(self):145 a = ef.account()146 spec = [147 {148 'name': 'value',149 'type': 'FLOAT',150 'size': None,151 'is_nullable': True,152 'is_enum': True,153 'distribution': [154 {'value': 19.0, 'count': 9},155 {'value': 19.0, 'count': 45},156 {'value': 21.2, 'count': 10},157 ],158 },159 ]160 with pytest.raises(ValidationError) as e:161 CatalogueItem.objects.create(162 maintained_by=a,163 name='weather_temperatures',164 sample=[],165 spec=spec,166 executor_type='DATABRICKS')167 assert e.value.message_dict == {168 'spec': [169 "not unique distribution values for column 'value' detected",170 ],171 }172 def test_invalid__broken_spec_distribution__counts_not_integers(self):173 a = ef.account()174 spec = [175 {176 'name': 'value',177 'type': 'FLOAT',178 'size': None,179 'is_nullable': True,180 'is_enum': True,181 'distribution': [182 {'value': 19.0, 'count': 45.9},183 {'value': 21.2, 'count': 10},184 ],185 },186 ]187 with pytest.raises(ValidationError) as e:188 CatalogueItem.objects.create(189 maintained_by=a,190 name='weather_temperatures',191 sample=[],192 spec=spec,193 executor_type='DATABRICKS')194 assert e.value.message_dict == {195 'spec': [196 "not integers distribution counts for column 'value' detected",197 ],198 }199 def test_invalid__broken_sample_not_same_names(self):200 a = ef.account()201 spec = [202 {203 'name': 'name',204 'type': 'STRING',205 'size': None,206 'is_nullable': True,207 'is_enum': True,208 'distribution': None,209 },210 {211 'name': 'price',212 'type': 'FLOAT',213 'size': None,214 'is_nullable': True,215 'is_enum': True,216 'distribution': None,217 },218 ]219 with pytest.raises(ValidationError) as e:220 CatalogueItem.objects.create(221 maintained_by=a,222 name='weather_temperatures',223 sample=[224 {'name': 'apple'},225 {'name': 'juice', 'price': 16},226 ],227 spec=spec,228 executor_type='DATABRICKS')229 assert e.value.message_dict == {230 '__all__': [231 'Sample column names and spec names are not identical',232 ],233 }234 def test_invalid__broken_sample_broken_types(self):235 a = ef.account()236 spec = [237 {238 'name': 'name',239 'type': 'STRING',240 'size': None,241 'is_nullable': True,242 'is_enum': True,243 'distribution': None,244 },245 {246 'name': 'price',247 'type': 'FLOAT',248 'size': None,249 'is_nullable': True,250 'is_enum': True,251 'distribution': None,252 },253 ]254 with pytest.raises(ValidationError) as e:255 CatalogueItem.objects.create(256 maintained_by=a,257 name='weather_temperatures',258 sample=[259 {'name': 'apple', 'price': 'free'},260 {'name': 'juice', 'price': 16},261 ],262 spec=spec,263 executor_type='DATABRICKS')264 assert e.value.message_dict == {265 '__all__': [266 "column type and sample value type mismatch detected for row "267 "number 0 column 'price'",268 ],269 }270 #271 # DATABASE272 #273 def test_database(self):274 a = ef.account()275 ci = CatalogueItem.objects.create(276 maintained_by=a,277 name='iot.events',278 sample=[{'value': 189}],279 spec=[280 {281 'name': 'value',282 'type': 'INTEGER',283 'size': None,284 'is_nullable': False,285 'is_enum': False,286 'distribution': None,287 },288 ],289 executor_type='ATHENA')290 assert ci.database == 'iot'291 #292 # TABLE293 #294 def test_table(self):295 a = ef.account()296 ci = CatalogueItem.objects.create(297 maintained_by=a,298 name='iot.events',299 sample=[{'value': 189}],300 spec=[301 {302 'name': 'value',303 'type': 'INTEGER',304 'size': None,305 'is_nullable': False,306 'is_enum': False,307 'distribution': None,308 },309 ],310 executor_type='ATHENA')311 assert ci.table == 'iot.events'312 #313 # UPDATE_SAMPLES_AND_DISTRIBUTIONS314 #315 def test_update_samples_and_distributions(self):316 a = ef.account()317 ci = CatalogueItem.objects.create(318 maintained_by=a,319 name='iot.events',320 sample=[],321 spec=[322 {323 'name': 'name',324 'type': 'STRING',325 'size': None,326 'is_nullable': False,327 'is_enum': False,328 'distribution': None,329 },330 {331 'name': 'value',332 'type': 'INTEGER',333 'size': None,334 'is_nullable': False,335 'is_enum': False,336 'distribution': None,337 },338 ],339 executor_type='ATHENA')340 get_sample = self.mocker.patch.object(AthenaExecutor, 'get_sample')341 get_sample.return_value = [342 {'name': 'temperature', 'value': 381},343 {'name': 'pressure', 'value': 13},344 ]345 get_size = self.mocker.patch.object(AthenaExecutor, 'get_size')346 get_size.side_effect = [678, 789]347 get_distribution = self.mocker.patch.object(348 AthenaExecutor, 'get_distribution')349 get_distribution.side_effect = [350 [{'value': 'temperature', 'count': 19}],351 [{'value': 233, 'count': 567}, {'value': 45, 'count': 123}],352 ]353 ci.update_samples_and_distributions()354 ci.refresh_from_db()355 assert ci.sample == [356 {'name': 'temperature', 'value': 381},357 {'name': 'pressure', 'value': 13},358 ]359 assert ci.spec == [360 {361 'name': 'name',362 'type': 'STRING',363 'size': 678,364 'is_nullable': False,365 'is_enum': False,366 'distribution': [{'value': 'temperature', 'count': 19}],367 },368 {369 'name': 'value',370 'type': 'INTEGER',371 'size': 789,372 'is_nullable': False,373 'is_enum': False,374 'distribution': [375 {'value': 233, 'count': 567},376 {'value': 45, 'count': 123},377 ],378 },...

Full Screen

Full Screen

binary.py

Source:binary.py Github

copy

Full Screen

1# Licensed to the Apache Software Foundation (ASF) under one or more2# contributor license agreements. See the NOTICE file distributed with3# this work for additional information regarding copyright ownership.4# The ASF licenses this file to You under the Apache License, Version 2.05# (the "License"); you may not use this file except in compliance with6# the License. You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15from typing import Union16from pyignite.constants import *17from pyignite.datatypes.binary import (18 body_struct, enum_struct, schema_struct, binary_fields_struct,19)20from pyignite.datatypes import String, Int, Bool21from pyignite.queries import Query, Response22from pyignite.queries.op_codes import *23from pyignite.utils import int_overflow, entity_id24from .result import APIResult25def get_binary_type(26 connection: 'Connection', binary_type: Union[str, int], query_id=None,27) -> APIResult:28 """29 Gets the binary type information by type ID.30 :param connection: connection to Ignite server,31 :param binary_type: binary type name or ID,32 :param query_id: (optional) a value generated by client and returned as-is33 in response.query_id. When the parameter is omitted, a random value34 is generated,35 :return: API result data object.36 """37 query_struct = Query(38 OP_GET_BINARY_TYPE,39 [40 ('type_id', Int),41 ],42 query_id=query_id,43 )44 _, send_buffer = query_struct.from_python({45 'type_id': entity_id(binary_type),46 })47 connection.send(send_buffer)48 response_head_struct = Response([49 ('type_exists', Bool),50 ])51 response_head_type, recv_buffer = response_head_struct.parse(connection)52 response_head = response_head_type.from_buffer_copy(recv_buffer)53 response_parts = []54 if response_head.type_exists:55 resp_body_type, resp_body_buffer = body_struct.parse(connection)56 response_parts.append(('body', resp_body_type))57 resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)58 recv_buffer += resp_body_buffer59 if resp_body.is_enum:60 resp_enum, resp_enum_buffer = enum_struct.parse(connection)61 response_parts.append(('enums', resp_enum))62 recv_buffer += resp_enum_buffer63 resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)64 response_parts.append(('schema', resp_schema_type))65 recv_buffer += resp_schema_buffer66 response_class = type(67 'GetBinaryTypeResponse',68 (response_head_type,),69 {70 '_pack_': 1,71 '_fields_': response_parts,72 }73 )74 response = response_class.from_buffer_copy(recv_buffer)75 result = APIResult(response)76 if result.status != 0:77 return result78 result.value = {79 'type_exists': response.type_exists80 }81 if hasattr(response, 'body'):82 result.value.update(body_struct.to_python(response.body))83 if hasattr(response, 'enums'):84 result.value['enums'] = enum_struct.to_python(response.enums)85 if hasattr(response, 'schema'):86 result.value['schema'] = {87 x['schema_id']: [88 z['schema_field_id'] for z in x['schema_fields']89 ]90 for x in schema_struct.to_python(response.schema)91 }92 return result93def put_binary_type(94 connection: 'Connection', type_name: str, affinity_key_field: str=None,95 is_enum=False, schema: dict=None, query_id=None,96) -> APIResult:97 """98 Registers binary type information in cluster.99 :param connection: connection to Ignite server,100 :param type_name: name of the data type being registered,101 :param affinity_key_field: (optional) name of the affinity key field,102 :param is_enum: (optional) register enum if True, binary object otherwise.103 Defaults to False,104 :param schema: (optional) when register enum, pass a dict of enumerated105 parameter names as keys and an integers as values. When register binary106 type, pass a dict of field names: field types. Binary type with no fields107 is OK,108 :param query_id: (optional) a value generated by client and returned as-is109 in response.query_id. When the parameter is omitted, a random value110 is generated,111 :return: API result data object.112 """113 # prepare data114 if schema is None:115 schema = {}116 type_id = entity_id(type_name)117 data = {118 'type_name': type_name,119 'type_id': type_id,120 'affinity_key_field': affinity_key_field,121 'binary_fields': [],122 'is_enum': is_enum,123 'schema': [],124 }125 schema_id = None126 if is_enum:127 data['enums'] = []128 for literal, ordinal in schema.items():129 data['enums'].append({130 'literal': literal,131 'type_id': ordinal,132 })133 else:134 # assemble schema and calculate schema ID in one go135 schema_id = FNV1_OFFSET_BASIS if schema else 0136 for field_name, data_type in schema.items():137 # TODO: check for allowed data types138 field_id = entity_id(field_name)139 data['binary_fields'].append({140 'field_name': field_name,141 'type_id': int.from_bytes(142 data_type.type_code,143 byteorder=PROTOCOL_BYTE_ORDER144 ),145 'field_id': field_id,146 })147 schema_id ^= (field_id & 0xff)148 schema_id = int_overflow(schema_id * FNV1_PRIME)149 schema_id ^= ((field_id >> 8) & 0xff)150 schema_id = int_overflow(schema_id * FNV1_PRIME)151 schema_id ^= ((field_id >> 16) & 0xff)152 schema_id = int_overflow(schema_id * FNV1_PRIME)153 schema_id ^= ((field_id >> 24) & 0xff)154 schema_id = int_overflow(schema_id * FNV1_PRIME)155 data['schema'].append({156 'schema_id': schema_id,157 'schema_fields': [158 {'schema_field_id': entity_id(x)} for x in schema159 ],160 })161 # do query162 if is_enum:163 query_struct = Query(164 OP_PUT_BINARY_TYPE,165 [166 ('type_id', Int),167 ('type_name', String),168 ('affinity_key_field', String),169 ('binary_fields', binary_fields_struct),170 ('is_enum', Bool),171 ('enums', enum_struct),172 ('schema', schema_struct),173 ],174 query_id=query_id,175 )176 else:177 query_struct = Query(178 OP_PUT_BINARY_TYPE,179 [180 ('type_id', Int),181 ('type_name', String),182 ('affinity_key_field', String),183 ('binary_fields', binary_fields_struct),184 ('is_enum', Bool),185 ('schema', schema_struct),186 ],187 query_id=query_id,188 )189 result = query_struct.perform(connection, query_params=data)190 if result.status == 0:191 result.value = {192 'type_id': type_id,193 'schema_id': schema_id,194 }...

Full Screen

Full Screen

FieldDescriptor.py

Source:FieldDescriptor.py Github

copy

Full Screen

1from Exceptions import FieldNameException2import keyword3class FieldDescriptor:4 def __init__(self, message_name, specifying, field_type, name, index, subtypes=None, is_enum=False):5 self.message_name = message_name6 self.specifying = specifying7 self.field_type = field_type8 self.name = name9 self.check_name()10 self.index = index11 self.subtypes = subtypes if subtypes is not None else None12 self.is_enum = is_enum13 def __str__(self):14 result = '{{"name": "{name}", ' \15 '"type": {type}, ' \16 '"subtypes": {subtypes}, ' \17 '"index": {index}, ' \18 '"is_enum": {is_enum}}}'.format(name=self.name,19 type=self.field_type,20 index=self.index,21 subtypes=self.subtypes,22 is_enum=self.is_enum)23 return result24 def check_name(self):25 if self.name in keyword.kwlist:26 raise FieldNameException(self.message_name,27 description="This name is keyword for python",28 name=self.name)29 if not self.name[0].isalpha():30 raise FieldNameException(self.message_name,31 description="Name must start with alpha",32 name=self.name)33 if self.name.find("-") != -1:34 raise FieldNameException(self.message_name,35 description="Name can't contain '-'",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful