Best Python code snippet using pandera_python
test_database_sqlite.py
Source:test_database_sqlite.py  
...57    @log_on_failure_decorator58    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])59    @given(cable_info=create_cable_info(False))60    def test_schema_cable_info(self, caplog, cable_info):61        self._validate_schema(SchemaNetworks().network_services_of(CableInfo, cable_info), caplog)62    @log_on_failure_decorator63    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])64    @given(no_load_test=create_no_load_test(False))65    def test_schema_no_load_test(self, caplog, no_load_test):66        self._validate_schema(SchemaNetworks().network_services_of(NoLoadTest, no_load_test), caplog)67    @log_on_failure_decorator68    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])69    @given(open_circuit_test=create_open_circuit_test(False))70    def test_schema_open_circuit_test(self, caplog, open_circuit_test):71        self._validate_schema(SchemaNetworks().network_services_of(OpenCircuitTest, open_circuit_test), caplog)72    @log_on_failure_decorator73    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])74    @given(overhead_wire_info=create_overhead_wire_info(False))75    def test_schema_overhead_wire_info(self, caplog, overhead_wire_info):76        self._validate_schema(SchemaNetworks().network_services_of(OverheadWireInfo, overhead_wire_info), caplog)77    @log_on_failure_decorator78    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])79    @given(power_transformer_info=create_power_transformer_info(False))80    def test_schema_power_transformer_info(self, caplog, power_transformer_info):81        self._validate_schema(SchemaNetworks().network_services_of(PowerTransformerInfo, power_transformer_info), caplog)82    @log_on_failure_decorator83    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])84    @given(short_circuit_test=create_short_circuit_test(False))85    def test_schema_short_circuit_test(self, caplog, short_circuit_test):86        self._validate_schema(SchemaNetworks().network_services_of(ShortCircuitTest, short_circuit_test), caplog)87    @log_on_failure_decorator88    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])89    @given(shunt_compensator_info=create_shunt_compensator_info(False))90    def test_schema_shunt_compensator_info(self, caplog, shunt_compensator_info):91        self._validate_schema(SchemaNetworks().network_services_of(ShuntCompensatorInfo, shunt_compensator_info), caplog)92    @log_on_failure_decorator93    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])94    @given(transformer_end_info=create_transformer_end_info(False))95    def test_schema_transformer_end_info(self, caplog, transformer_end_info):96        self._validate_schema(SchemaNetworks().network_services_of(TransformerEndInfo, transformer_end_info), caplog)97    @log_on_failure_decorator98    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])99    @given(transformer_tank_info=create_transformer_tank_info(False))100    def test_schema_transformer_tank_info(self, caplog, transformer_tank_info):101        self._validate_schema(SchemaNetworks().network_services_of(TransformerTankInfo, transformer_tank_info), caplog)102    # ************ IEC61968 ASSETS ************103    @log_on_failure_decorator104    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])105    @given(asset_owner=create_asset_owner(False))106    def test_schema_asset_owner(self, caplog, asset_owner):107        self._validate_schema(SchemaNetworks().network_services_of(AssetOwner, asset_owner), caplog)108    @log_on_failure_decorator109    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])110    @given(pole=create_pole(False))111    def test_schema_pole(self, caplog, pole):112        self._validate_schema(SchemaNetworks().network_services_of(Pole, pole), caplog)113    @log_on_failure_decorator114    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])115    @given(streetlight=create_streetlight(False))116    def test_schema_streetlight(self, caplog, streetlight):117        self._validate_schema(SchemaNetworks().network_services_of(Streetlight, streetlight), caplog)118    # ************ IEC61968 CUSTOMERS ************119    @log_on_failure_decorator120    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])121    @given(customer=create_customer(False))122    def test_schema_customer(self, caplog, customer):123        self._validate_schema(SchemaNetworks().customer_services_of(Customer, customer), caplog)124    @log_on_failure_decorator125    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])126    @given(customer_agreement=create_customer_agreement(False))127    def test_schema_customer_agreement(self, caplog, customer_agreement):128        self._validate_schema(SchemaNetworks().customer_services_of(CustomerAgreement, customer_agreement), caplog)129    @log_on_failure_decorator130    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])131    @given(pricing_structure=create_pricing_structure(False))132    def test_schema_pricing_structure(self, caplog, pricing_structure):133        self._validate_schema(SchemaNetworks().customer_services_of(PricingStructure, pricing_structure), caplog)134    @log_on_failure_decorator135    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])136    @given(tariffs=create_tariffs(False))137    def test_schema_tariffs(self, caplog, tariffs):138        self._validate_schema(SchemaNetworks().customer_services_of(Tariff, tariffs), caplog)139    # ************ IEC61968 METERING ************140    @log_on_failure_decorator141    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])142    @given(meter=create_meter(False))143    def test_schema_meter(self, caplog, meter):144        self._validate_schema(SchemaNetworks().network_services_of(Meter, meter), caplog)145    @log_on_failure_decorator146    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])147    @given(usage_point=create_usage_point(False))148    def test_schema_usage_point(self, caplog, usage_point):149        self._validate_schema(SchemaNetworks().network_services_of(UsagePoint, usage_point), caplog)150    # ************ IEC61968 COMMON ************151    @log_on_failure_decorator152    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])153    @given(location=create_location(False))154    def test_schema_location(self, caplog, location):155        self._validate_schema(SchemaNetworks().network_services_of(Location, location), caplog)156    @log_on_failure_decorator157    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])158    @given(organisation=create_organisation(False))159    def test_schema_organisation(self, caplog, organisation):160        self._validate_schema(SchemaNetworks().network_services_of(Organisation, organisation), caplog)161    @log_on_failure_decorator162    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])163    @given(organisation=create_organisation(False))164    def test_schema_organisation_customer(self, caplog, organisation):165        self._validate_schema(SchemaNetworks().customer_services_of(Organisation, organisation), caplog)166    # ************ IEC61968 OPERATIONS ************167    @log_on_failure_decorator168    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])169    @given(operational_restriction=create_operational_restriction(False))170    def test_schema_operational_restriction(self, caplog, operational_restriction):171        self._validate_schema(SchemaNetworks().network_services_of(OperationalRestriction, operational_restriction), caplog)172    # ************ IEC61970 BASE AUXILIARY EQUIPMENT ************173    @log_on_failure_decorator174    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])175    @given(fault_indicator=create_fault_indicator(False))176    def test_schema_fault_indicator(self, caplog, fault_indicator):177        self._validate_schema(SchemaNetworks().network_services_of(FaultIndicator, fault_indicator), caplog)178    # ************ IEC61970 BASE CORE ************179    @log_on_failure_decorator180    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])181    @given(base_voltage=create_base_voltage(False))182    def test_schema_base_voltage(self, caplog, base_voltage):183        self._validate_schema(SchemaNetworks().network_services_of(BaseVoltage, base_voltage), caplog)184    @log_on_failure_decorator185    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])186    @given(connectivity_node=create_connectivity_node(False))187    def test_schema_connectivity_node(self, caplog, connectivity_node):188        self._validate_schema(SchemaNetworks().network_services_of(ConnectivityNode, connectivity_node), caplog)189    @log_on_failure_decorator190    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])191    @given(feeder=create_feeder(False))192    def test_schema_feeder(self, caplog, feeder):193        self._validate_schema(SchemaNetworks().network_services_of(Feeder, feeder), caplog)194    @log_on_failure_decorator195    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])196    @given(geographical_region=create_geographical_region(False))197    def test_schema_geographical_region(self, caplog, geographical_region):198        self._validate_schema(SchemaNetworks().network_services_of(GeographicalRegion, geographical_region), caplog)199    @log_on_failure_decorator200    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])201    @given(site=create_site(False))202    def test_schema_site(self, caplog, site):203        self._validate_schema(SchemaNetworks().network_services_of(Site, site), caplog)204    @log_on_failure_decorator205    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])206    @given(sub_geographical_region=create_sub_geographical_region(False))207    def test_schema_sub_geographical_region(self, caplog, sub_geographical_region):208        self._validate_schema(SchemaNetworks().network_services_of(SubGeographicalRegion, sub_geographical_region), caplog)209    @log_on_failure_decorator210    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])211    @given(substation=create_substation(False))212    def test_schema_substation(self, caplog, substation):213        self._validate_schema(SchemaNetworks().network_services_of(Substation, substation), caplog)214    @log_on_failure_decorator215    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])216    @given(terminal=create_terminal(False))217    def test_schema_terminal(self, caplog, terminal):218        self._validate_schema(SchemaNetworks().network_services_of(Terminal, terminal), caplog)219    # ************ IEC61970 BASE DIAGRAM LAYOUT ************220    @log_on_failure_decorator221    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])222    @given(diagram=create_diagram(False))223    def test_schema_diagram(self, caplog, diagram):224        self._validate_schema(SchemaNetworks().diagram_services_of(Diagram, diagram), caplog)225    @log_on_failure_decorator226    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])227    @given(diagram_object=create_diagram_object(False))228    def test_schema_diagram_object(self, caplog, diagram_object):229        self._validate_schema(SchemaNetworks().diagram_services_of(DiagramObject, diagram_object), caplog)230    # ************ IEC61970 BASE EQUIVALENTS ************231    @log_on_failure_decorator232    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])233    @given(equivalent_branch=create_equivalent_branch(False))234    def test_schema_equivalent_branch(self, caplog, equivalent_branch):235        self._validate_schema(SchemaNetworks().network_services_of(EquivalentBranch, equivalent_branch), caplog)236    # ************ IEC61970 BASE MEAS ************237    @log_on_failure_decorator238    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])239    @given(accumulator=create_accumulator(False))240    def test_schema_accumulator(self, caplog, accumulator):241        self._validate_schema(SchemaNetworks().network_services_of(Accumulator, accumulator), caplog)242        # self._validate_schema(SchemaNetworks().measurement_services_of(AccumulatorValue, data.draw(create_accumulator_value(False))), caplog)243    @log_on_failure_decorator244    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])245    @given(analog=create_analog(False))246    def test_schema_analog(self, caplog, analog):247        self._validate_schema(SchemaNetworks().network_services_of(Analog, analog), caplog)248        # self._validate_schema(SchemaNetworks().measurement_services_of(AnalogValue, data.draw(create_analog_value(False))), caplog)249    @log_on_failure_decorator250    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])251    @given(control=create_control(False))252    def test_schema_control(self, caplog, control):253        self._validate_schema(SchemaNetworks().network_services_of(Control, control), caplog)254    @log_on_failure_decorator255    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])256    @given(discrete=create_discrete(False))257    def test_schema_discrete(self, caplog, discrete):258        self._validate_schema(SchemaNetworks().network_services_of(Discrete, discrete), caplog)259        # self._validate_schema(SchemaNetworks().measurement_services_of(DiscreteValue, data.draw(create_discrete_value(False))), caplog)260    # ************ IEC61970 BASE SCADA ************261    @log_on_failure_decorator262    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])263    @given(remote_control=create_remote_control(False))264    def test_schema_remote_control(self, caplog, remote_control):265        self._validate_schema(SchemaNetworks().network_services_of(RemoteControl, remote_control), caplog)266    @log_on_failure_decorator267    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])268    @given(remote_source=create_remote_source(False))269    def test_schema_remote_source(self, caplog, remote_source):270        self._validate_schema(SchemaNetworks().network_services_of(RemoteSource, remote_source), caplog)271    # ************ IEC61970 BASE WIRES GENERATION PRODUCTION ************272    @log_on_failure_decorator273    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])274    @given(battery_unit=create_battery_unit(False))275    def test_schema_battery_unit(self, caplog, battery_unit):276        self._validate_schema(SchemaNetworks().network_services_of(BatteryUnit, battery_unit), caplog)277    @log_on_failure_decorator278    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])279    @given(photovoltaic_unit=create_photovoltaic_unit(False))280    def test_schema_photovoltaic_unit(self, caplog, photovoltaic_unit):281        self._validate_schema(SchemaNetworks().network_services_of(PhotoVoltaicUnit, photovoltaic_unit), caplog)282    @log_on_failure_decorator283    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])284    @given(power_electronics_connection=create_power_electronics_connection(False))285    def test_schema_power_electronics_connection(self, caplog, power_electronics_connection):286        self._validate_schema(SchemaNetworks().network_services_of(PowerElectronicsConnection, power_electronics_connection), caplog)287    @log_on_failure_decorator288    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])289    @given(pec_phase=create_power_electronics_connection_phase(False))290    def test_schema_power_electronics_connection_phase(self, caplog, pec_phase):291        self._validate_schema(SchemaNetworks().network_services_of(PowerElectronicsConnectionPhase, pec_phase), caplog)292    @log_on_failure_decorator293    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])294    @given(power_electronics_wind_unit=create_power_electronics_wind_unit(False))295    def test_schema_power_electronics_wind_unit(self, caplog, power_electronics_wind_unit):296        self._validate_schema(SchemaNetworks().network_services_of(PowerElectronicsWindUnit, power_electronics_wind_unit), caplog)297    # ************ IEC61970 BASE WIRES ************298    @log_on_failure_decorator299    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])300    @given(ac_line_segment=create_ac_line_segment(False))301    def test_schema_ac_line_segment(self, caplog, ac_line_segment):302        self._validate_schema(SchemaNetworks().network_services_of(AcLineSegment, ac_line_segment), caplog)303    @log_on_failure_decorator304    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])305    @given(breaker=create_breaker(False))306    def test_schema_breaker(self, caplog, breaker):307        self._validate_schema(SchemaNetworks().network_services_of(Breaker, breaker), caplog)308    @log_on_failure_decorator309    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])310    @given(busbar_section=create_busbar_section(False))311    def test_schema_busbar_section(self, caplog, busbar_section):312        self._validate_schema(SchemaNetworks().network_services_of(BusbarSection, busbar_section), caplog)313    @log_on_failure_decorator314    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])315    @given(disconnector=create_disconnector(False))316    def test_schema_disconnector(self, caplog, disconnector):317        self._validate_schema(SchemaNetworks().network_services_of(Disconnector, disconnector), caplog)318    @log_on_failure_decorator319    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])320    @given(energy_consumer=create_energy_consumer(False))321    def test_schema_energy_consumer(self, caplog, energy_consumer):322        assume(len(Counter(map(lambda it: it.phase, energy_consumer.phases))) == len(list(energy_consumer.phases)))323        self._validate_schema(SchemaNetworks().network_services_of(EnergyConsumer, energy_consumer), caplog)324    @log_on_failure_decorator325    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])326    @given(energy_consumer_phase=create_energy_consumer_phase(False))327    def test_schema_energy_consumer_phase(self, caplog, energy_consumer_phase):328        self._validate_schema(SchemaNetworks().network_services_of(EnergyConsumerPhase, energy_consumer_phase), caplog)329    @log_on_failure_decorator330    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])331    @given(energy_source=create_energy_source(False))332    def test_schema_energy_source(self, caplog, energy_source):333        assume(len(Counter(map(lambda it: it.phase, energy_source.phases))) == len(list(energy_source.phases)))334        self._validate_schema(SchemaNetworks().network_services_of(EnergySource, energy_source), caplog)335    @log_on_failure_decorator336    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])337    @given(energy_source_phase=create_energy_source_phase(False))338    def test_schema_energy_source_phase(self, caplog, energy_source_phase):339        self._validate_schema(SchemaNetworks().network_services_of(EnergySourcePhase, energy_source_phase), caplog)340    @log_on_failure_decorator341    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])342    @given(fuse=create_fuse(False))343    def test_schema_fuse(self, caplog, fuse):344        self._validate_schema(SchemaNetworks().network_services_of(Fuse, fuse), caplog)345    @log_on_failure_decorator346    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])347    @given(jumper=create_jumper(False))348    def test_schema_jumper(self, caplog, jumper):349        self._validate_schema(SchemaNetworks().network_services_of(Jumper, jumper), caplog)350    @log_on_failure_decorator351    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])352    @given(junction=create_junction(False))353    def test_schema_junction(self, caplog, junction):354        self._validate_schema(SchemaNetworks().network_services_of(Junction, junction), caplog)355    @log_on_failure_decorator356    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])357    @given(linear_shunt_compensator=create_linear_shunt_compensator(False))358    def test_schema_linear_shunt_compensator(self, caplog, linear_shunt_compensator):359        self._validate_schema(SchemaNetworks().network_services_of(LinearShuntCompensator, linear_shunt_compensator), caplog)360    @log_on_failure_decorator361    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])362    @given(load_break_switch=create_load_break_switch(False))363    def test_schema_load_break_switch(self, caplog, load_break_switch):364        self._validate_schema(SchemaNetworks().network_services_of(LoadBreakSwitch, load_break_switch), caplog)365    @log_on_failure_decorator366    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])367    @given(per_length_sequence_impedance=create_per_length_sequence_impedance(False))368    def test_schema_per_length_sequence_impedance(self, caplog, per_length_sequence_impedance):369        self._validate_schema(SchemaNetworks().network_services_of(PerLengthSequenceImpedance, per_length_sequence_impedance), caplog)370    @log_on_failure_decorator371    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])372    @given(power_transformer=create_power_transformer(False))373    def test_schema_power_transformer(self, caplog, power_transformer):374        self._validate_schema(SchemaNetworks().network_services_of(PowerTransformer, power_transformer), caplog)375    @log_on_failure_decorator376    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])377    @given(power_transformer_end=create_power_transformer_end(False))378    def test_schema_power_transformer_end(self, caplog, power_transformer_end):379        self._validate_schema(SchemaNetworks().network_services_of(PowerTransformerEnd, power_transformer_end), caplog)380    @log_on_failure_decorator381    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])382    @given(ratio_tap_changer=create_ratio_tap_changer(False))383    def test_schema_ratio_tap_changer(self, caplog, ratio_tap_changer):384        self._validate_schema(SchemaNetworks().network_services_of(RatioTapChanger, ratio_tap_changer), caplog)385    @log_on_failure_decorator386    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])387    @given(recloser=create_recloser(False))388    def test_schema_recloser(self, caplog, recloser):389        self._validate_schema(SchemaNetworks().network_services_of(Recloser, recloser), caplog)390    @log_on_failure_decorator391    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])392    @given(transformer_star_impedance=create_transformer_star_impedance(False))393    def test_schema_transformer_star_impedance(self, caplog, transformer_star_impedance):394        self._validate_schema(SchemaNetworks().network_services_of(TransformerStarImpedance, transformer_star_impedance), caplog)395    # ************ IEC61970 InfIEC61970 ************396    @log_on_failure_decorator397    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])398    @given(circuit=create_circuit(False))399    def test_schema_circuit(self, caplog, circuit):400        self._validate_schema(SchemaNetworks().network_services_of(Circuit, circuit), caplog)401    @log_on_failure_decorator402    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])403    @given(loop=create_loop(False))404    def test_schema_loop(self, caplog, loop):405        self._validate_schema(SchemaNetworks().network_services_of(Loop, loop), caplog)406    # noinspection PyShadowingNames407    @log_on_failure_decorator408    @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])409    @given(lv_feeder=create_lv_feeder(False))410    def test_schema_lv_feeder(self, caplog, lv_feeder):411        self._validate_schema(SchemaNetworks().network_services_of(LvFeeder, lv_feeder), caplog)412    # ************ Services ************413    @log_on_failure_decorator414    def test_metadata_data_source_schema(self, caplog):415        self._validate_schema(SchemaNetworks().create_data_source_test_services(), caplog)416    @log_on_failure_decorator417    def test_name_and_name_type_schema(self, caplog):418        self._validate_schema(SchemaNetworks().create_name_test_services(), caplog)419    @log_on_failure_decorator420    def test_error_on_duplicate_id_added_to_customer_service(self, caplog):421        write_services = Services()422        read_services = Services()423        customer = Customer(mrid="customer1")424        write_services.customer_service.add(customer)425        read_services.customer_service.add(customer)426        self._test_duplicate_mrid_error(write_services, read_services, read_services.customer_service, customer, caplog)427    @log_on_failure_decorator428    def test_error_on_duplicate_id_added_to_diagram_service(self, caplog):429        write_services = Services()430        read_services = Services()431        diagram = Diagram(mrid="diagram1")432        write_services.diagram_service.add(diagram)433        read_services.diagram_service.add(diagram)434        self._test_duplicate_mrid_error(write_services, read_services, read_services.diagram_service, diagram, caplog)435    @log_on_failure_decorator436    def test_error_on_duplicate_id_added_to_network_service(self, caplog):437        write_services = Services()438        read_services = Services()439        junction = Junction(mrid="junction1")440        write_services.network_service.add(junction)441        read_services.network_service.add(junction)442        self._test_duplicate_mrid_error(write_services, read_services, read_services.network_service, junction, caplog)443    @log_on_failure_decorator444    def test_only_loads_street_address_fields_if_required(self, caplog):445        write_services = Services()446        read_services = Services()447        # noinspection PyArgumentList448        location = Location(mrid="loc1", main_address=StreetAddress(town_detail=TownDetail(), street_detail=StreetDetail()))449        write_services.network_service.add(location)450        read_services = Services()451        def validate_read(success):452            assert success453            loc = read_services.network_service.get("loc1", Location)454            assert loc.main_address == StreetAddress()455        self._test_write_read(456            write_services,457            read_services,458            validate_read,459            caplog460        )461    def _test_duplicate_mrid_error(462        self,463        write_services: Services,464        read_services: Services,465        service_with_duplicate: BaseService,466        duplicate: IdentifiedObject,467        caplog468    ):469        expected_error = f"Failed to load {duplicate}. Unable to add to service '{service_with_duplicate.name}': duplicate MRID"470        def validate_read(success):471            assert not success472            assert expected_error in caplog.text473        self._test_write_read(474            write_services,475            read_services,476            validate_read,477            caplog478        )479    def _validate_schema(self, expected: Services, caplog):480        for location in expected.network_service.objects(Location):481            assume_non_blank_street_address_details(location.main_address)482        read_services = Services()483        def validate_read(success):484            assert success485            self._validate_metadata(read_services.metadata_collection, expected.metadata_collection)486            self._validate_service(read_services.network_service, expected.network_service, NetworkServiceComparator())487            self._validate_service(read_services.diagram_service, expected.diagram_service, DiagramServiceComparator())488            self._validate_service(read_services.customer_service, expected.customer_service, CustomerServiceComparator())489            for d_obj in filter(lambda it: it.identified_object_mrid is not None, expected.diagram_service.objects(DiagramObject)):490                assert read_services.diagram_service.get_diagram_objects(d_obj.identified_object_mrid)491        self._test_write_read(492            expected,493            read_services,...__main__.py
Source:__main__.py  
...29        ret = pytoml.load(file)30    except pytoml.TomlError as e:31        raise UserError(f'{file.name}: TOML is malformed: {e}')32    return ret33def _validate_schema(d, schema, fname):34    for name, (typ, defval) in schema.items():35        val = d.get(name, defval)36        if val is None:37            raise UserError(f'{fname}: required parameter {name} not found')38        if not isinstance(val, typ):39            raise UserError(f'{fname}: {name}: type mismatch')40        d.setdefault(name, defval)41_outfilespec_schema = {42    'name': (str, None),43    'outputspecs': (list, None)44}45def get_outfilespec(ofs, fname):46    _validate_schema(ofs, _outfilespec_schema, fname)47    outputspecs = list(map(functools.partial(get_outputspec, fname=fname),48                           ofs['outputspecs']))49    return launch.OutFileSpec(ofs['name'], outputspecs)50_outputspec_schema = {51    'vartypes': (dict, None),52    'regex': (str, None),53}54def get_outputspec(spec, fname):55    _validate_schema(spec, _outputspec_schema, fname)56    _validate_param_types(spec['vartypes'], fname)57    return launch.OutputSpec(spec['vartypes'], spec['regex'])58def _validate_param_types(params, fname):59    if not isinstance(params, dict):60        raise UserError(f'{fname}: params must be a dictionary')61    for k, v in params.items():62        if not re.match(ParamSpecParser.ident_regex, k):63            raise UserError(64                f'{fname}: parameter {k} is not a valid identifier')65        if v not in launch.types:66            raise UserError(67                f'{fname}: parameter {k}: {v} is not one of int, float, str')68_machine_spec_schema = {69    'host': (str, None),70    'username': (str, None),71    'port': (int, 22)72}73def load_machine_spec(file):74    d = _toml_from_file(file)75    _validate_schema(d, _machine_spec_schema, file.name)76    return launch.MachineSpec(**d)77_launch_profile_schema = {78    'machine': (str, None),79    'start_cmd': (str, None),80    'poll_cmd': (str, None),81    'cancel_cmd': (str, None),82    'base_dir': (str, None),83    'params': (dict, {}),84    'param_order': (list, []),85    'out_file_specs': (list, [])86}87def load_launch_profile(file):88    d = _toml_from_file(file)89    _validate_schema(d, _launch_profile_schema, file.name)90    if not all(x in d['params'].keys() for x in d['param_order']):91        raise UserError(f'{file.name}: unknown parameters in param_order')92    with _open_file(d['machine'], file.name) as f:93        d['machine'] = load_machine_spec(f)94    d['out_file_specs'] = list(map(functools.partial(95        get_outfilespec, fname=file.name), d['out_file_specs']))96    return launch.LaunchProfile(**d)97_infilespec_schema = {98    'name': (str, None),99    'template': (str, None)100}101def get_infilespec(d, fname):102    _validate_schema(d, _infilespec_schema, fname)103    with _open_file(d['template'], fname) as f:104        template = f.read()105    return launch.InFileSpec(d['name'], template)106_prog_spec_schema = {107    'params': (dict, None),108    'args': (list, None),109    'stdout': (list, []),110    'out_file_specs': (list, []),111    'in_file_specs': (list, [])112}113def load_prog_spec(file):114    d = _toml_from_file(file)115    _validate_schema(d, _prog_spec_schema, file.name)116    _validate_param_types(d['params'], file.name)117    if not all(isinstance(x, str) for x in d['args']):118        raise UserError(f'{file.name}: args: type mismatch')119    d['stdout'] = list(map(functools.partial(get_outputspec, fname=file.name), d['stdout']))120    d['out_file_specs'] = list(map(functools.partial(121        get_outfilespec, fname=file.name), d['out_file_specs']))122    d['in_file_specs'] = list(map(functools.partial(123        get_infilespec, fname=file.name), d['in_file_specs']))124    return launch.ProgSpec(**d)125def write_launch_specs(file, launch_profile_file, prog_spec_file, executable, launch_specs):126    all_specs = list(map(attr.asdict, launch_specs))127    pytoml.dump({'launch_profile': launch_profile_file,128                 'prog_spec': prog_spec_file,129                 'executable': executable,130                 'specs': list(all_specs)}, file)131_infile_creation_spec_schema = {132    'name': (str, None),133    'contents': (str, None)134}135def get_infile_creation_spec(spec, fname):136    _validate_schema(spec, _infile_creation_spec_schema, fname)137    return launch.InFileCreationSpec(**spec)138_launch_spec_schema = {139    'args': (list, None),140    'params': (dict, None),141    'infiles': (list, None)142}143_launch_spec_file_schema = {144    'launch_profile': (str, None),145    'prog_spec': (str, None),146    'executable': (str, None),147    'specs': (list, None)148}149def load_launch_specs(file):150    obj = _toml_from_file(file)151    _validate_schema(obj, _launch_spec_file_schema, file.name)152    with _open_file(obj['launch_profile'], file.name) as f:153        launch_profile = load_launch_profile(f)154    with _open_file(obj['prog_spec'], file.name) as f:155        prog_spec = load_prog_spec(f)156    spec_list = []157    for d in obj['specs']:158        _validate_schema(d, _launch_spec_schema, file.name)159        ifcs = list(map(functools.partial(get_infile_creation_spec, fname=file.name), d['infiles']))160        spec_list.append(launch.LaunchSpec(d['args'], d['params'], ifcs))161    return launch_profile, prog_spec, obj['executable'], spec_list162def write_results(file, results):163    ret = []164    for index, res in results.items():165        ret.append({166            'index': index,167            'jobid': str(res.jobid),168            'state': {'status': res.state.status.name, 'exit_code': res.state.exit_code},169            'cwd': res.cwd170        })171    pytoml.dump({'result': ret}, file)172_jobstate_schema = {173    'status': (str, None),174    'exit_code': (int, None)175}176def get_job_state(obj, fname):177    _validate_schema(obj, _jobstate_schema, fname)178    status = jobs.JobStateType[obj['status']]179    return jobs.JobState(status, obj['exit_code'])180_result_schema = {181    'index': (int, None),182    'jobid': (str, None),183    'state': (dict, None),184    'cwd': (str, None)185}186def load_results(file):187    obj = _toml_from_file(file)188    if not isinstance(obj, dict):189        raise UserError('invalid result spec')190    try:191        dlist = obj['result']192    except KeyError:193        raise UserError('invalid result spec')194    results = {}195    for d in dlist:196        _validate_schema(d, _result_schema, file.name)197        results[d['index']] = launch.Result(198            d['jobid'],199            get_job_state(d['state'], file.name),200            d['cwd'])201    return results202def genparams(args):203    site_spec = load_launch_profile(args.launch_profile)204    prog_spec = load_prog_spec(args.prog_spec)205    try:206        params = gen_params(args.param_spec.read())207    except ParamSpecError as e:208        raise UserError(str(e))209    params = list(params)210    print(f'Number of runs: {len(params)}', file=sys.stderr)...__init__.py
Source:__init__.py  
...21                                   TX_SCHEMA_VERSION)22_, TX_SCHEMA_TRANSFER = _load_schema('transaction_transfer_' +23                                     TX_SCHEMA_VERSION)24VOTE_SCHEMA_PATH, VOTE_SCHEMA = _load_schema('vote')25def _validate_schema(schema, body):26    """Validate data against a schema"""27    # Note28    #29    # Schema validation is currently the major CPU bottleneck of30    # BigchainDB. the `jsonschema` library validates python data structures31    # directly and produces nice error messages, but validation takes 4+ ms32    # per transaction which is pretty slow. The rapidjson library validates33    # much faster at 1.5ms, however it produces _very_ poor error messages.34    # For this reason we use both, rapidjson as an optimistic pathway and35    # jsonschema as a fallback in case there is a failure, so we can produce36    # a helpful error message.37    try:38        schema[1].validate(rapidjson.dumps(body))39    except ValueError as exc:40        try:41            jsonschema.validate(body, schema[0])42        except jsonschema.ValidationError as exc2:43            raise SchemaValidationError(str(exc2)) from exc244        logger.warning('code problem: jsonschema did not raise an exception, wheras rapidjson raised %s', exc)45        raise SchemaValidationError(str(exc)) from exc46def validate_transaction_schema(tx):47    """Validate a transaction dict.48    TX_SCHEMA_COMMON contains properties that are common to all types of49    transaction. TX_SCHEMA_[TRANSFER|CREATE] add additional constraints on top.50    """51    _validate_schema(TX_SCHEMA_COMMON, tx)52    if tx['operation'] == 'TRANSFER':53        _validate_schema(TX_SCHEMA_TRANSFER, tx)54    else:55        _validate_schema(TX_SCHEMA_CREATE, tx)56def validate_vote_schema(vote):57    """Validate a vote dict"""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
