Best Python code snippet using localstack_python
test_models.py
Source:test_models.py  
1from django.test import TestCase2from django.core.exceptions import ValidationError3import pytest4from catalogue.models import CatalogueItem5from downloader.executors.athena import AthenaExecutor6from tests.factory import EntityFactory7ef = EntityFactory()8class CatalogueItemTestCase(TestCase):9    @pytest.fixture(autouse=True)10    def initfixtures(self, mocker):11        self.mocker = mocker12    def setUp(self):13        ef.clear()14    def test_simple_create(self):15        a = ef.account()16        spec = [17            {18                'name': 'location',19                'type': 'STRING',20                'size': 190234,21                'is_nullable': False,22                'is_enum': False,23                'distribution': None,24            },25            {26                'name': 'value',27                'type': 'FLOAT',28                'size': None,29                'is_nullable': True,30                'is_enum': True,31                'distribution': [32                    {'value': 18.0, 'count': 9},33                    {'value': 19.1, 'count': 45},34                    {'value': 21.2, 'count': 10},35                    {'value': None, 'count': 190},36                ],37            },38        ]39        ci = CatalogueItem.objects.create(40            maintained_by=a,41            name='weather_temperatures',42            sample=[43                {'location': 'Warsaw', 'value': 19.2},44                {'location': 'Wroclaw', 'value': 17.9},45            ],46            spec=spec,47            executor_type='DATABRICKS')48        assert ci.created_datetime is not None49        assert ci.updated_datetime is not None50        assert ci.created_by is None51        assert ci.updated_by is None52        assert ci.maintained_by == a53        assert ci.name == 'weather_temperatures'54        assert ci.sample == [55            {'location': 'Warsaw', 'value': 19.2},56            {'location': 'Wroclaw', 'value': 17.9},57        ]58        assert ci.spec == spec59    def test_invalid__name_should_be_unique(self):60        ef.catalogue_item(name='feature')61        with pytest.raises(ValidationError) as e:62            ef.catalogue_item(name='feature')63        assert e.value.message_dict == {64            'name': ['Catalogue item with this Name already exists.'],65        }66    def test_invalid__broken_spec_schema__missing_fields(self):67        a = ef.account()68        spec = [69            {70                'type': 'STRING',71                'size': 190234,72                'is_nullable': False,73                'is_enum': False,74                'distribution': None,75            },76        ]77        with pytest.raises(ValidationError) as e:78            CatalogueItem.objects.create(79                maintained_by=a,80                name='weather_temperatures',81                sample=[],82                spec=spec,83                executor_type='DATABRICKS')84        assert e.value.message_dict == {85            'spec': [86                "JSON did not validate. PATH: '0' REASON: 'name' is a "87                "required property",88            ],89        }90    def test_invalid__broken_spec_schema__wrong_type(self):91        a = ef.account()92        spec = [93            {94                'name': 'price',95                'type': 'STRING',96                'size': '190234',97                'is_nullable': False,98                'is_enum': False,99                'distribution': None,100            },101        ]102        with pytest.raises(ValidationError) as e:103            CatalogueItem.objects.create(104                maintained_by=a,105                name='weather_temperatures',106                sample=[],107                spec=spec,108                executor_type='DATABRICKS')109        assert e.value.message_dict == {110            'spec': [111                "JSON did not validate. PATH: '0.size' REASON: '190234' "112                "is not valid under any of the given schemas",113            ],114        }115    def test_invalid__broken_spec_distribution_types(self):116        a = ef.account()117        spec = [118            {119                'name': 'value',120                'type': 'FLOAT',121                'size': None,122                'is_nullable': True,123                'is_enum': True,124                'distribution': [125                    {'value': 18.0, 'count': 9},126                    {'value': '19', 'count': 45},127                    {'value': 21.2, 'count': 10},128                ],129            },130        ]131        with pytest.raises(ValidationError) as e:132            CatalogueItem.objects.create(133                maintained_by=a,134                name='weather_temperatures',135                sample=[],136                spec=spec,137                executor_type='DATABRICKS')138        assert e.value.message_dict == {139            'spec': [140                "column type and distribution value type mismatch detected "141                "for column 'value'",142            ],143        }144    def test_invalid__broken_spec_distribution__values_not_unique(self):145        a = ef.account()146        spec = [147            {148                'name': 'value',149                'type': 'FLOAT',150                'size': None,151                'is_nullable': True,152                'is_enum': True,153                'distribution': [154                    {'value': 19.0, 'count': 9},155                    {'value': 19.0, 'count': 45},156                    {'value': 21.2, 'count': 10},157                ],158            },159        ]160        with pytest.raises(ValidationError) as e:161            CatalogueItem.objects.create(162                maintained_by=a,163                name='weather_temperatures',164                sample=[],165                spec=spec,166                executor_type='DATABRICKS')167        assert e.value.message_dict == {168            'spec': [169                "not unique distribution values for column 'value' detected",170            ],171        }172    def test_invalid__broken_spec_distribution__counts_not_integers(self):173        a = ef.account()174        spec = [175            {176                'name': 'value',177                'type': 'FLOAT',178                'size': None,179                'is_nullable': True,180                'is_enum': True,181                'distribution': [182                    {'value': 19.0, 'count': 45.9},183                    {'value': 21.2, 'count': 10},184                ],185            },186        ]187        with pytest.raises(ValidationError) as e:188            CatalogueItem.objects.create(189                maintained_by=a,190                name='weather_temperatures',191                sample=[],192                spec=spec,193                executor_type='DATABRICKS')194        assert e.value.message_dict == {195            'spec': [196                "not integers distribution counts for column 'value' detected",197            ],198        }199    def test_invalid__broken_sample_not_same_names(self):200        a = ef.account()201        spec = [202            {203                'name': 'name',204                'type': 'STRING',205                'size': None,206                'is_nullable': True,207                'is_enum': True,208                'distribution': None,209            },210            {211                'name': 'price',212                'type': 'FLOAT',213                'size': None,214                'is_nullable': True,215                'is_enum': True,216                'distribution': None,217            },218        ]219        with pytest.raises(ValidationError) as e:220            CatalogueItem.objects.create(221                maintained_by=a,222                name='weather_temperatures',223                sample=[224                    {'name': 'apple'},225                    {'name': 'juice', 'price': 16},226                ],227                spec=spec,228                executor_type='DATABRICKS')229        assert e.value.message_dict == {230            '__all__': [231                'Sample column names and spec names are not identical',232            ],233        }234    def test_invalid__broken_sample_broken_types(self):235        a = ef.account()236        spec = [237            {238                'name': 'name',239                'type': 'STRING',240                'size': None,241                'is_nullable': True,242                'is_enum': True,243                'distribution': None,244            },245            {246                'name': 'price',247                'type': 'FLOAT',248                'size': None,249                'is_nullable': True,250                'is_enum': True,251                'distribution': None,252            },253        ]254        with pytest.raises(ValidationError) as e:255            CatalogueItem.objects.create(256                maintained_by=a,257                name='weather_temperatures',258                sample=[259                    {'name': 'apple', 'price': 'free'},260                    {'name': 'juice', 'price': 16},261                ],262                spec=spec,263                executor_type='DATABRICKS')264        assert e.value.message_dict == {265            '__all__': [266                "column type and sample value type mismatch detected for row "267                "number 0 column 'price'",268            ],269        }270    #271    # DATABASE272    #273    def test_database(self):274        a = ef.account()275        ci = CatalogueItem.objects.create(276            maintained_by=a,277            name='iot.events',278            sample=[{'value': 189}],279            spec=[280                {281                    'name': 'value',282                    'type': 'INTEGER',283                    'size': None,284                    'is_nullable': False,285                    'is_enum': False,286                    'distribution': None,287                },288            ],289            executor_type='ATHENA')290        assert ci.database == 'iot'291    #292    # TABLE293    #294    def test_table(self):295        a = ef.account()296        ci = CatalogueItem.objects.create(297            maintained_by=a,298            name='iot.events',299            sample=[{'value': 189}],300            spec=[301                {302                    'name': 'value',303                    'type': 'INTEGER',304                    'size': None,305                    'is_nullable': False,306                    'is_enum': False,307                    'distribution': None,308                },309            ],310            executor_type='ATHENA')311        assert ci.table == 'iot.events'312    #313    # UPDATE_SAMPLES_AND_DISTRIBUTIONS314    #315    def test_update_samples_and_distributions(self):316        a = ef.account()317        ci = CatalogueItem.objects.create(318            maintained_by=a,319            name='iot.events',320            sample=[],321            spec=[322                {323                    'name': 'name',324                    'type': 'STRING',325                    'size': None,326                    'is_nullable': False,327                    'is_enum': False,328                    'distribution': None,329                },330                {331                    'name': 'value',332                    'type': 'INTEGER',333                    'size': None,334                    'is_nullable': False,335                    'is_enum': False,336                    'distribution': None,337                },338            ],339            executor_type='ATHENA')340        get_sample = self.mocker.patch.object(AthenaExecutor, 'get_sample')341        get_sample.return_value = [342            {'name': 'temperature', 'value': 381},343            {'name': 'pressure', 'value': 13},344        ]345        get_size = self.mocker.patch.object(AthenaExecutor, 'get_size')346        get_size.side_effect = [678, 789]347        get_distribution = self.mocker.patch.object(348            AthenaExecutor, 'get_distribution')349        get_distribution.side_effect = [350            [{'value': 'temperature', 'count': 19}],351            [{'value': 233, 'count': 567}, {'value': 45, 'count': 123}],352        ]353        ci.update_samples_and_distributions()354        ci.refresh_from_db()355        assert ci.sample == [356            {'name': 'temperature', 'value': 381},357            {'name': 'pressure', 'value': 13},358        ]359        assert ci.spec == [360            {361                'name': 'name',362                'type': 'STRING',363                'size': 678,364                'is_nullable': False,365                'is_enum': False,366                'distribution': [{'value': 'temperature', 'count': 19}],367            },368            {369                'name': 'value',370                'type': 'INTEGER',371                'size': 789,372                'is_nullable': False,373                'is_enum': False,374                'distribution': [375                    {'value': 233, 'count': 567},376                    {'value': 45, 'count': 123},377                ],378            },...binary.py
Source:binary.py  
1# Licensed to the Apache Software Foundation (ASF) under one or more2# contributor license agreements.  See the NOTICE file distributed with3# this work for additional information regarding copyright ownership.4# The ASF licenses this file to You under the Apache License, Version 2.05# (the "License"); you may not use this file except in compliance with6# the License.  You may obtain a copy of the License at7#8#      http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15from typing import Union16from pyignite.constants import *17from pyignite.datatypes.binary import (18    body_struct, enum_struct, schema_struct, binary_fields_struct,19)20from pyignite.datatypes import String, Int, Bool21from pyignite.queries import Query, Response22from pyignite.queries.op_codes import *23from pyignite.utils import int_overflow, entity_id24from .result import APIResult25def get_binary_type(26    connection: 'Connection', binary_type: Union[str, int], query_id=None,27) -> APIResult:28    """29    Gets the binary type information by type ID.30    :param connection: connection to Ignite server,31    :param binary_type: binary type name or ID,32    :param query_id: (optional) a value generated by client and returned as-is33     in response.query_id. When the parameter is omitted, a random value34     is generated,35    :return: API result data object.36    """37    query_struct = Query(38        OP_GET_BINARY_TYPE,39        [40            ('type_id', Int),41        ],42        query_id=query_id,43    )44    _, send_buffer = query_struct.from_python({45        'type_id': entity_id(binary_type),46    })47    connection.send(send_buffer)48    response_head_struct = Response([49        ('type_exists', Bool),50    ])51    response_head_type, recv_buffer = response_head_struct.parse(connection)52    response_head = response_head_type.from_buffer_copy(recv_buffer)53    response_parts = []54    if response_head.type_exists:55        resp_body_type, resp_body_buffer = body_struct.parse(connection)56        response_parts.append(('body', resp_body_type))57        resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)58        recv_buffer += resp_body_buffer59        if resp_body.is_enum:60            resp_enum, resp_enum_buffer = enum_struct.parse(connection)61            response_parts.append(('enums', resp_enum))62            recv_buffer += resp_enum_buffer63        resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)64        response_parts.append(('schema', resp_schema_type))65        recv_buffer += resp_schema_buffer66    response_class = type(67        'GetBinaryTypeResponse',68        (response_head_type,),69        {70            '_pack_': 1,71            '_fields_': response_parts,72        }73    )74    response = response_class.from_buffer_copy(recv_buffer)75    result = APIResult(response)76    if result.status != 0:77        return result78    result.value = {79        'type_exists': response.type_exists80    }81    if hasattr(response, 'body'):82        result.value.update(body_struct.to_python(response.body))83    if hasattr(response, 'enums'):84        result.value['enums'] = enum_struct.to_python(response.enums)85    if hasattr(response, 'schema'):86        result.value['schema'] = {87            x['schema_id']: [88                z['schema_field_id'] for z in x['schema_fields']89            ]90            for x in schema_struct.to_python(response.schema)91        }92    return result93def put_binary_type(94    connection: 'Connection', type_name: str, affinity_key_field: str=None,95    is_enum=False, schema: dict=None, query_id=None,96) -> APIResult:97    """98    Registers binary type information in cluster.99    :param connection: connection to Ignite server,100    :param type_name: name of the data type being registered,101    :param affinity_key_field: (optional) name of the affinity key field,102    :param is_enum: (optional) register enum if True, binary object otherwise.103     Defaults to False,104    :param schema: (optional) when register enum, pass a dict of enumerated105     parameter names as keys and an integers as values. When register binary106     type, pass a dict of field names: field types. Binary type with no fields107     is OK,108    :param query_id: (optional) a value generated by client and returned as-is109     in response.query_id. When the parameter is omitted, a random value110     is generated,111    :return: API result data object.112    """113    # prepare data114    if schema is None:115        schema = {}116    type_id = entity_id(type_name)117    data = {118        'type_name': type_name,119        'type_id': type_id,120        'affinity_key_field': affinity_key_field,121        'binary_fields': [],122        'is_enum': is_enum,123        'schema': [],124    }125    schema_id = None126    if is_enum:127        data['enums'] = []128        for literal, ordinal in schema.items():129            data['enums'].append({130                'literal': literal,131                'type_id': ordinal,132            })133    else:134        # assemble schema and calculate schema ID in one go135        schema_id = FNV1_OFFSET_BASIS if schema else 0136        for field_name, data_type in schema.items():137            # TODO: check for allowed data types138            field_id = entity_id(field_name)139            data['binary_fields'].append({140                'field_name': field_name,141                'type_id': int.from_bytes(142                    data_type.type_code,143                    byteorder=PROTOCOL_BYTE_ORDER144                ),145                'field_id': field_id,146            })147            schema_id ^= (field_id & 0xff)148            schema_id = int_overflow(schema_id * FNV1_PRIME)149            schema_id ^= ((field_id >> 8) & 0xff)150            schema_id = int_overflow(schema_id * FNV1_PRIME)151            schema_id ^= ((field_id >> 16) & 0xff)152            schema_id = int_overflow(schema_id * FNV1_PRIME)153            schema_id ^= ((field_id >> 24) & 0xff)154            schema_id = int_overflow(schema_id * FNV1_PRIME)155    data['schema'].append({156        'schema_id': schema_id,157        'schema_fields': [158            {'schema_field_id': entity_id(x)} for x in schema159        ],160    })161    # do query162    if is_enum:163        query_struct = Query(164            OP_PUT_BINARY_TYPE,165            [166                ('type_id', Int),167                ('type_name', String),168                ('affinity_key_field', String),169                ('binary_fields', binary_fields_struct),170                ('is_enum', Bool),171                ('enums', enum_struct),172                ('schema', schema_struct),173            ],174            query_id=query_id,175        )176    else:177        query_struct = Query(178            OP_PUT_BINARY_TYPE,179            [180                ('type_id', Int),181                ('type_name', String),182                ('affinity_key_field', String),183                ('binary_fields', binary_fields_struct),184                ('is_enum', Bool),185                ('schema', schema_struct),186            ],187            query_id=query_id,188        )189    result = query_struct.perform(connection, query_params=data)190    if result.status == 0:191        result.value = {192            'type_id': type_id,193            'schema_id': schema_id,194        }...FieldDescriptor.py
Source:FieldDescriptor.py  
1from Exceptions import FieldNameException2import keyword3class FieldDescriptor:4    def __init__(self, message_name, specifying, field_type, name, index, subtypes=None, is_enum=False):5        self.message_name = message_name6        self.specifying = specifying7        self.field_type = field_type8        self.name = name9        self.check_name()10        self.index = index11        self.subtypes = subtypes if subtypes is not None else None12        self.is_enum = is_enum13    def __str__(self):14        result = '{{"name": "{name}", ' \15                 '"type": {type}, ' \16                 '"subtypes": {subtypes}, ' \17                 '"index": {index}, ' \18                 '"is_enum": {is_enum}}}'.format(name=self.name,19                                                 type=self.field_type,20                                                 index=self.index,21                                                 subtypes=self.subtypes,22                                                 is_enum=self.is_enum)23        return result24    def check_name(self):25        if self.name in keyword.kwlist:26            raise FieldNameException(self.message_name,27                                     description="This name is keyword for python",28                                     name=self.name)29        if not self.name[0].isalpha():30            raise FieldNameException(self.message_name,31                                     description="Name must start with alpha",32                                     name=self.name)33        if self.name.find("-") != -1:34            raise FieldNameException(self.message_name,35                                     description="Name can't contain '-'",...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
