How to use _validate_schema method in pandera

Best Python code snippet using pandera_python

test_database_sqlite.py

Source:test_database_sqlite.py Github

copy

Full Screen

...57 @log_on_failure_decorator58 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])59 @given(cable_info=create_cable_info(False))60 def test_schema_cable_info(self, caplog, cable_info):61 self._validate_schema(SchemaNetworks().network_services_of(CableInfo, cable_info), caplog)62 @log_on_failure_decorator63 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])64 @given(no_load_test=create_no_load_test(False))65 def test_schema_no_load_test(self, caplog, no_load_test):66 self._validate_schema(SchemaNetworks().network_services_of(NoLoadTest, no_load_test), caplog)67 @log_on_failure_decorator68 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])69 @given(open_circuit_test=create_open_circuit_test(False))70 def test_schema_open_circuit_test(self, caplog, open_circuit_test):71 self._validate_schema(SchemaNetworks().network_services_of(OpenCircuitTest, open_circuit_test), caplog)72 @log_on_failure_decorator73 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])74 @given(overhead_wire_info=create_overhead_wire_info(False))75 def test_schema_overhead_wire_info(self, caplog, overhead_wire_info):76 self._validate_schema(SchemaNetworks().network_services_of(OverheadWireInfo, overhead_wire_info), caplog)77 @log_on_failure_decorator78 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])79 @given(power_transformer_info=create_power_transformer_info(False))80 def test_schema_power_transformer_info(self, caplog, power_transformer_info):81 self._validate_schema(SchemaNetworks().network_services_of(PowerTransformerInfo, power_transformer_info), caplog)82 @log_on_failure_decorator83 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])84 @given(short_circuit_test=create_short_circuit_test(False))85 def test_schema_short_circuit_test(self, caplog, short_circuit_test):86 self._validate_schema(SchemaNetworks().network_services_of(ShortCircuitTest, short_circuit_test), caplog)87 @log_on_failure_decorator88 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])89 @given(shunt_compensator_info=create_shunt_compensator_info(False))90 def test_schema_shunt_compensator_info(self, caplog, shunt_compensator_info):91 self._validate_schema(SchemaNetworks().network_services_of(ShuntCompensatorInfo, shunt_compensator_info), caplog)92 @log_on_failure_decorator93 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])94 @given(transformer_end_info=create_transformer_end_info(False))95 def test_schema_transformer_end_info(self, caplog, transformer_end_info):96 self._validate_schema(SchemaNetworks().network_services_of(TransformerEndInfo, transformer_end_info), caplog)97 @log_on_failure_decorator98 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])99 @given(transformer_tank_info=create_transformer_tank_info(False))100 def test_schema_transformer_tank_info(self, caplog, transformer_tank_info):101 self._validate_schema(SchemaNetworks().network_services_of(TransformerTankInfo, transformer_tank_info), caplog)102 # ************ IEC61968 ASSETS ************103 @log_on_failure_decorator104 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])105 @given(asset_owner=create_asset_owner(False))106 def test_schema_asset_owner(self, caplog, asset_owner):107 self._validate_schema(SchemaNetworks().network_services_of(AssetOwner, asset_owner), caplog)108 @log_on_failure_decorator109 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])110 @given(pole=create_pole(False))111 def test_schema_pole(self, caplog, pole):112 self._validate_schema(SchemaNetworks().network_services_of(Pole, pole), caplog)113 @log_on_failure_decorator114 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])115 @given(streetlight=create_streetlight(False))116 def test_schema_streetlight(self, caplog, streetlight):117 self._validate_schema(SchemaNetworks().network_services_of(Streetlight, streetlight), caplog)118 # ************ IEC61968 CUSTOMERS ************119 @log_on_failure_decorator120 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])121 @given(customer=create_customer(False))122 def test_schema_customer(self, caplog, customer):123 self._validate_schema(SchemaNetworks().customer_services_of(Customer, customer), caplog)124 @log_on_failure_decorator125 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])126 @given(customer_agreement=create_customer_agreement(False))127 def test_schema_customer_agreement(self, caplog, customer_agreement):128 self._validate_schema(SchemaNetworks().customer_services_of(CustomerAgreement, customer_agreement), caplog)129 @log_on_failure_decorator130 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])131 @given(pricing_structure=create_pricing_structure(False))132 def test_schema_pricing_structure(self, caplog, pricing_structure):133 self._validate_schema(SchemaNetworks().customer_services_of(PricingStructure, pricing_structure), caplog)134 @log_on_failure_decorator135 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])136 @given(tariffs=create_tariffs(False))137 def test_schema_tariffs(self, caplog, tariffs):138 self._validate_schema(SchemaNetworks().customer_services_of(Tariff, tariffs), caplog)139 # ************ IEC61968 METERING ************140 @log_on_failure_decorator141 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])142 @given(meter=create_meter(False))143 def test_schema_meter(self, caplog, meter):144 self._validate_schema(SchemaNetworks().network_services_of(Meter, meter), caplog)145 @log_on_failure_decorator146 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])147 @given(usage_point=create_usage_point(False))148 def test_schema_usage_point(self, caplog, usage_point):149 self._validate_schema(SchemaNetworks().network_services_of(UsagePoint, usage_point), caplog)150 # ************ IEC61968 COMMON ************151 @log_on_failure_decorator152 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])153 @given(location=create_location(False))154 def test_schema_location(self, caplog, location):155 self._validate_schema(SchemaNetworks().network_services_of(Location, location), caplog)156 @log_on_failure_decorator157 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])158 @given(organisation=create_organisation(False))159 def test_schema_organisation(self, caplog, organisation):160 self._validate_schema(SchemaNetworks().network_services_of(Organisation, organisation), caplog)161 @log_on_failure_decorator162 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])163 @given(organisation=create_organisation(False))164 def test_schema_organisation_customer(self, caplog, organisation):165 self._validate_schema(SchemaNetworks().customer_services_of(Organisation, organisation), caplog)166 # ************ IEC61968 OPERATIONS ************167 @log_on_failure_decorator168 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])169 @given(operational_restriction=create_operational_restriction(False))170 def test_schema_operational_restriction(self, caplog, operational_restriction):171 self._validate_schema(SchemaNetworks().network_services_of(OperationalRestriction, operational_restriction), caplog)172 # ************ IEC61970 BASE AUXILIARY EQUIPMENT ************173 @log_on_failure_decorator174 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])175 @given(fault_indicator=create_fault_indicator(False))176 def test_schema_fault_indicator(self, caplog, fault_indicator):177 self._validate_schema(SchemaNetworks().network_services_of(FaultIndicator, fault_indicator), caplog)178 # ************ IEC61970 BASE CORE ************179 @log_on_failure_decorator180 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])181 @given(base_voltage=create_base_voltage(False))182 def test_schema_base_voltage(self, caplog, base_voltage):183 self._validate_schema(SchemaNetworks().network_services_of(BaseVoltage, base_voltage), caplog)184 @log_on_failure_decorator185 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])186 @given(connectivity_node=create_connectivity_node(False))187 def test_schema_connectivity_node(self, caplog, connectivity_node):188 self._validate_schema(SchemaNetworks().network_services_of(ConnectivityNode, connectivity_node), caplog)189 @log_on_failure_decorator190 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])191 @given(feeder=create_feeder(False))192 def test_schema_feeder(self, caplog, feeder):193 self._validate_schema(SchemaNetworks().network_services_of(Feeder, feeder), caplog)194 @log_on_failure_decorator195 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])196 @given(geographical_region=create_geographical_region(False))197 def test_schema_geographical_region(self, caplog, geographical_region):198 self._validate_schema(SchemaNetworks().network_services_of(GeographicalRegion, geographical_region), caplog)199 @log_on_failure_decorator200 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])201 @given(site=create_site(False))202 def test_schema_site(self, caplog, site):203 self._validate_schema(SchemaNetworks().network_services_of(Site, site), caplog)204 @log_on_failure_decorator205 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])206 @given(sub_geographical_region=create_sub_geographical_region(False))207 def test_schema_sub_geographical_region(self, caplog, sub_geographical_region):208 self._validate_schema(SchemaNetworks().network_services_of(SubGeographicalRegion, sub_geographical_region), caplog)209 @log_on_failure_decorator210 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])211 @given(substation=create_substation(False))212 def test_schema_substation(self, caplog, substation):213 self._validate_schema(SchemaNetworks().network_services_of(Substation, substation), caplog)214 @log_on_failure_decorator215 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])216 @given(terminal=create_terminal(False))217 def test_schema_terminal(self, caplog, terminal):218 self._validate_schema(SchemaNetworks().network_services_of(Terminal, terminal), caplog)219 # ************ IEC61970 BASE DIAGRAM LAYOUT ************220 @log_on_failure_decorator221 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])222 @given(diagram=create_diagram(False))223 def test_schema_diagram(self, caplog, diagram):224 self._validate_schema(SchemaNetworks().diagram_services_of(Diagram, diagram), caplog)225 @log_on_failure_decorator226 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])227 @given(diagram_object=create_diagram_object(False))228 def test_schema_diagram_object(self, caplog, diagram_object):229 self._validate_schema(SchemaNetworks().diagram_services_of(DiagramObject, diagram_object), caplog)230 # ************ IEC61970 BASE EQUIVALENTS ************231 @log_on_failure_decorator232 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])233 @given(equivalent_branch=create_equivalent_branch(False))234 def test_schema_equivalent_branch(self, caplog, equivalent_branch):235 self._validate_schema(SchemaNetworks().network_services_of(EquivalentBranch, equivalent_branch), caplog)236 # ************ IEC61970 BASE MEAS ************237 @log_on_failure_decorator238 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])239 @given(accumulator=create_accumulator(False))240 def test_schema_accumulator(self, caplog, accumulator):241 self._validate_schema(SchemaNetworks().network_services_of(Accumulator, accumulator), caplog)242 # self._validate_schema(SchemaNetworks().measurement_services_of(AccumulatorValue, data.draw(create_accumulator_value(False))), caplog)243 @log_on_failure_decorator244 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])245 @given(analog=create_analog(False))246 def test_schema_analog(self, caplog, analog):247 self._validate_schema(SchemaNetworks().network_services_of(Analog, analog), caplog)248 # self._validate_schema(SchemaNetworks().measurement_services_of(AnalogValue, data.draw(create_analog_value(False))), caplog)249 @log_on_failure_decorator250 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])251 @given(control=create_control(False))252 def test_schema_control(self, caplog, control):253 self._validate_schema(SchemaNetworks().network_services_of(Control, control), caplog)254 @log_on_failure_decorator255 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])256 @given(discrete=create_discrete(False))257 def test_schema_discrete(self, caplog, discrete):258 self._validate_schema(SchemaNetworks().network_services_of(Discrete, discrete), caplog)259 # self._validate_schema(SchemaNetworks().measurement_services_of(DiscreteValue, data.draw(create_discrete_value(False))), caplog)260 # ************ IEC61970 BASE SCADA ************261 @log_on_failure_decorator262 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])263 @given(remote_control=create_remote_control(False))264 def test_schema_remote_control(self, caplog, remote_control):265 self._validate_schema(SchemaNetworks().network_services_of(RemoteControl, remote_control), caplog)266 @log_on_failure_decorator267 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])268 @given(remote_source=create_remote_source(False))269 def test_schema_remote_source(self, caplog, remote_source):270 self._validate_schema(SchemaNetworks().network_services_of(RemoteSource, remote_source), caplog)271 # ************ IEC61970 BASE WIRES GENERATION PRODUCTION ************272 @log_on_failure_decorator273 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])274 @given(battery_unit=create_battery_unit(False))275 def test_schema_battery_unit(self, caplog, battery_unit):276 self._validate_schema(SchemaNetworks().network_services_of(BatteryUnit, battery_unit), caplog)277 @log_on_failure_decorator278 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])279 @given(photovoltaic_unit=create_photovoltaic_unit(False))280 def test_schema_photovoltaic_unit(self, caplog, photovoltaic_unit):281 self._validate_schema(SchemaNetworks().network_services_of(PhotoVoltaicUnit, photovoltaic_unit), caplog)282 @log_on_failure_decorator283 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])284 @given(power_electronics_connection=create_power_electronics_connection(False))285 def test_schema_power_electronics_connection(self, caplog, power_electronics_connection):286 self._validate_schema(SchemaNetworks().network_services_of(PowerElectronicsConnection, power_electronics_connection), caplog)287 @log_on_failure_decorator288 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])289 @given(pec_phase=create_power_electronics_connection_phase(False))290 def test_schema_power_electronics_connection_phase(self, caplog, pec_phase):291 self._validate_schema(SchemaNetworks().network_services_of(PowerElectronicsConnectionPhase, pec_phase), caplog)292 @log_on_failure_decorator293 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])294 @given(power_electronics_wind_unit=create_power_electronics_wind_unit(False))295 def test_schema_power_electronics_wind_unit(self, caplog, power_electronics_wind_unit):296 self._validate_schema(SchemaNetworks().network_services_of(PowerElectronicsWindUnit, power_electronics_wind_unit), caplog)297 # ************ IEC61970 BASE WIRES ************298 @log_on_failure_decorator299 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])300 @given(ac_line_segment=create_ac_line_segment(False))301 def test_schema_ac_line_segment(self, caplog, ac_line_segment):302 self._validate_schema(SchemaNetworks().network_services_of(AcLineSegment, ac_line_segment), caplog)303 @log_on_failure_decorator304 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])305 @given(breaker=create_breaker(False))306 def test_schema_breaker(self, caplog, breaker):307 self._validate_schema(SchemaNetworks().network_services_of(Breaker, breaker), caplog)308 @log_on_failure_decorator309 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])310 @given(busbar_section=create_busbar_section(False))311 def test_schema_busbar_section(self, caplog, busbar_section):312 self._validate_schema(SchemaNetworks().network_services_of(BusbarSection, busbar_section), caplog)313 @log_on_failure_decorator314 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])315 @given(disconnector=create_disconnector(False))316 def test_schema_disconnector(self, caplog, disconnector):317 self._validate_schema(SchemaNetworks().network_services_of(Disconnector, disconnector), caplog)318 @log_on_failure_decorator319 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])320 @given(energy_consumer=create_energy_consumer(False))321 def test_schema_energy_consumer(self, caplog, energy_consumer):322 assume(len(Counter(map(lambda it: it.phase, energy_consumer.phases))) == len(list(energy_consumer.phases)))323 self._validate_schema(SchemaNetworks().network_services_of(EnergyConsumer, energy_consumer), caplog)324 @log_on_failure_decorator325 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])326 @given(energy_consumer_phase=create_energy_consumer_phase(False))327 def test_schema_energy_consumer_phase(self, caplog, energy_consumer_phase):328 self._validate_schema(SchemaNetworks().network_services_of(EnergyConsumerPhase, energy_consumer_phase), caplog)329 @log_on_failure_decorator330 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])331 @given(energy_source=create_energy_source(False))332 def test_schema_energy_source(self, caplog, energy_source):333 assume(len(Counter(map(lambda it: it.phase, energy_source.phases))) == len(list(energy_source.phases)))334 self._validate_schema(SchemaNetworks().network_services_of(EnergySource, energy_source), caplog)335 @log_on_failure_decorator336 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])337 @given(energy_source_phase=create_energy_source_phase(False))338 def test_schema_energy_source_phase(self, caplog, energy_source_phase):339 self._validate_schema(SchemaNetworks().network_services_of(EnergySourcePhase, energy_source_phase), caplog)340 @log_on_failure_decorator341 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])342 @given(fuse=create_fuse(False))343 def test_schema_fuse(self, caplog, fuse):344 self._validate_schema(SchemaNetworks().network_services_of(Fuse, fuse), caplog)345 @log_on_failure_decorator346 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])347 @given(jumper=create_jumper(False))348 def test_schema_jumper(self, caplog, jumper):349 self._validate_schema(SchemaNetworks().network_services_of(Jumper, jumper), caplog)350 @log_on_failure_decorator351 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])352 @given(junction=create_junction(False))353 def test_schema_junction(self, caplog, junction):354 self._validate_schema(SchemaNetworks().network_services_of(Junction, junction), caplog)355 @log_on_failure_decorator356 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])357 @given(linear_shunt_compensator=create_linear_shunt_compensator(False))358 def test_schema_linear_shunt_compensator(self, caplog, linear_shunt_compensator):359 self._validate_schema(SchemaNetworks().network_services_of(LinearShuntCompensator, linear_shunt_compensator), caplog)360 @log_on_failure_decorator361 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])362 @given(load_break_switch=create_load_break_switch(False))363 def test_schema_load_break_switch(self, caplog, load_break_switch):364 self._validate_schema(SchemaNetworks().network_services_of(LoadBreakSwitch, load_break_switch), caplog)365 @log_on_failure_decorator366 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])367 @given(per_length_sequence_impedance=create_per_length_sequence_impedance(False))368 def test_schema_per_length_sequence_impedance(self, caplog, per_length_sequence_impedance):369 self._validate_schema(SchemaNetworks().network_services_of(PerLengthSequenceImpedance, per_length_sequence_impedance), caplog)370 @log_on_failure_decorator371 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])372 @given(power_transformer=create_power_transformer(False))373 def test_schema_power_transformer(self, caplog, power_transformer):374 self._validate_schema(SchemaNetworks().network_services_of(PowerTransformer, power_transformer), caplog)375 @log_on_failure_decorator376 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])377 @given(power_transformer_end=create_power_transformer_end(False))378 def test_schema_power_transformer_end(self, caplog, power_transformer_end):379 self._validate_schema(SchemaNetworks().network_services_of(PowerTransformerEnd, power_transformer_end), caplog)380 @log_on_failure_decorator381 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])382 @given(ratio_tap_changer=create_ratio_tap_changer(False))383 def test_schema_ratio_tap_changer(self, caplog, ratio_tap_changer):384 self._validate_schema(SchemaNetworks().network_services_of(RatioTapChanger, ratio_tap_changer), caplog)385 @log_on_failure_decorator386 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])387 @given(recloser=create_recloser(False))388 def test_schema_recloser(self, caplog, recloser):389 self._validate_schema(SchemaNetworks().network_services_of(Recloser, recloser), caplog)390 @log_on_failure_decorator391 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])392 @given(transformer_star_impedance=create_transformer_star_impedance(False))393 def test_schema_transformer_star_impedance(self, caplog, transformer_star_impedance):394 self._validate_schema(SchemaNetworks().network_services_of(TransformerStarImpedance, transformer_star_impedance), caplog)395 # ************ IEC61970 InfIEC61970 ************396 @log_on_failure_decorator397 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])398 @given(circuit=create_circuit(False))399 def test_schema_circuit(self, caplog, circuit):400 self._validate_schema(SchemaNetworks().network_services_of(Circuit, circuit), caplog)401 @log_on_failure_decorator402 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])403 @given(loop=create_loop(False))404 def test_schema_loop(self, caplog, loop):405 self._validate_schema(SchemaNetworks().network_services_of(Loop, loop), caplog)406 # noinspection PyShadowingNames407 @log_on_failure_decorator408 @settings(deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow])409 @given(lv_feeder=create_lv_feeder(False))410 def test_schema_lv_feeder(self, caplog, lv_feeder):411 self._validate_schema(SchemaNetworks().network_services_of(LvFeeder, lv_feeder), caplog)412 # ************ Services ************413 @log_on_failure_decorator414 def test_metadata_data_source_schema(self, caplog):415 self._validate_schema(SchemaNetworks().create_data_source_test_services(), caplog)416 @log_on_failure_decorator417 def test_name_and_name_type_schema(self, caplog):418 self._validate_schema(SchemaNetworks().create_name_test_services(), caplog)419 @log_on_failure_decorator420 def test_error_on_duplicate_id_added_to_customer_service(self, caplog):421 write_services = Services()422 read_services = Services()423 customer = Customer(mrid="customer1")424 write_services.customer_service.add(customer)425 read_services.customer_service.add(customer)426 self._test_duplicate_mrid_error(write_services, read_services, read_services.customer_service, customer, caplog)427 @log_on_failure_decorator428 def test_error_on_duplicate_id_added_to_diagram_service(self, caplog):429 write_services = Services()430 read_services = Services()431 diagram = Diagram(mrid="diagram1")432 write_services.diagram_service.add(diagram)433 read_services.diagram_service.add(diagram)434 self._test_duplicate_mrid_error(write_services, read_services, read_services.diagram_service, diagram, caplog)435 @log_on_failure_decorator436 def test_error_on_duplicate_id_added_to_network_service(self, caplog):437 write_services = Services()438 read_services = Services()439 junction = Junction(mrid="junction1")440 write_services.network_service.add(junction)441 read_services.network_service.add(junction)442 self._test_duplicate_mrid_error(write_services, read_services, read_services.network_service, junction, caplog)443 @log_on_failure_decorator444 def test_only_loads_street_address_fields_if_required(self, caplog):445 write_services = Services()446 read_services = Services()447 # noinspection PyArgumentList448 location = Location(mrid="loc1", main_address=StreetAddress(town_detail=TownDetail(), street_detail=StreetDetail()))449 write_services.network_service.add(location)450 read_services = Services()451 def validate_read(success):452 assert success453 loc = read_services.network_service.get("loc1", Location)454 assert loc.main_address == StreetAddress()455 self._test_write_read(456 write_services,457 read_services,458 validate_read,459 caplog460 )461 def _test_duplicate_mrid_error(462 self,463 write_services: Services,464 read_services: Services,465 service_with_duplicate: BaseService,466 duplicate: IdentifiedObject,467 caplog468 ):469 expected_error = f"Failed to load {duplicate}. Unable to add to service '{service_with_duplicate.name}': duplicate MRID"470 def validate_read(success):471 assert not success472 assert expected_error in caplog.text473 self._test_write_read(474 write_services,475 read_services,476 validate_read,477 caplog478 )479 def _validate_schema(self, expected: Services, caplog):480 for location in expected.network_service.objects(Location):481 assume_non_blank_street_address_details(location.main_address)482 read_services = Services()483 def validate_read(success):484 assert success485 self._validate_metadata(read_services.metadata_collection, expected.metadata_collection)486 self._validate_service(read_services.network_service, expected.network_service, NetworkServiceComparator())487 self._validate_service(read_services.diagram_service, expected.diagram_service, DiagramServiceComparator())488 self._validate_service(read_services.customer_service, expected.customer_service, CustomerServiceComparator())489 for d_obj in filter(lambda it: it.identified_object_mrid is not None, expected.diagram_service.objects(DiagramObject)):490 assert read_services.diagram_service.get_diagram_objects(d_obj.identified_object_mrid)491 self._test_write_read(492 expected,493 read_services,...

Full Screen

Full Screen

__main__.py

Source:__main__.py Github

copy

Full Screen

...29 ret = pytoml.load(file)30 except pytoml.TomlError as e:31 raise UserError(f'{file.name}: TOML is malformed: {e}')32 return ret33def _validate_schema(d, schema, fname):34 for name, (typ, defval) in schema.items():35 val = d.get(name, defval)36 if val is None:37 raise UserError(f'{fname}: required parameter {name} not found')38 if not isinstance(val, typ):39 raise UserError(f'{fname}: {name}: type mismatch')40 d.setdefault(name, defval)41_outfilespec_schema = {42 'name': (str, None),43 'outputspecs': (list, None)44}45def get_outfilespec(ofs, fname):46 _validate_schema(ofs, _outfilespec_schema, fname)47 outputspecs = list(map(functools.partial(get_outputspec, fname=fname),48 ofs['outputspecs']))49 return launch.OutFileSpec(ofs['name'], outputspecs)50_outputspec_schema = {51 'vartypes': (dict, None),52 'regex': (str, None),53}54def get_outputspec(spec, fname):55 _validate_schema(spec, _outputspec_schema, fname)56 _validate_param_types(spec['vartypes'], fname)57 return launch.OutputSpec(spec['vartypes'], spec['regex'])58def _validate_param_types(params, fname):59 if not isinstance(params, dict):60 raise UserError(f'{fname}: params must be a dictionary')61 for k, v in params.items():62 if not re.match(ParamSpecParser.ident_regex, k):63 raise UserError(64 f'{fname}: parameter {k} is not a valid identifier')65 if v not in launch.types:66 raise UserError(67 f'{fname}: parameter {k}: {v} is not one of int, float, str')68_machine_spec_schema = {69 'host': (str, None),70 'username': (str, None),71 'port': (int, 22)72}73def load_machine_spec(file):74 d = _toml_from_file(file)75 _validate_schema(d, _machine_spec_schema, file.name)76 return launch.MachineSpec(**d)77_launch_profile_schema = {78 'machine': (str, None),79 'start_cmd': (str, None),80 'poll_cmd': (str, None),81 'cancel_cmd': (str, None),82 'base_dir': (str, None),83 'params': (dict, {}),84 'param_order': (list, []),85 'out_file_specs': (list, [])86}87def load_launch_profile(file):88 d = _toml_from_file(file)89 _validate_schema(d, _launch_profile_schema, file.name)90 if not all(x in d['params'].keys() for x in d['param_order']):91 raise UserError(f'{file.name}: unknown parameters in param_order')92 with _open_file(d['machine'], file.name) as f:93 d['machine'] = load_machine_spec(f)94 d['out_file_specs'] = list(map(functools.partial(95 get_outfilespec, fname=file.name), d['out_file_specs']))96 return launch.LaunchProfile(**d)97_infilespec_schema = {98 'name': (str, None),99 'template': (str, None)100}101def get_infilespec(d, fname):102 _validate_schema(d, _infilespec_schema, fname)103 with _open_file(d['template'], fname) as f:104 template = f.read()105 return launch.InFileSpec(d['name'], template)106_prog_spec_schema = {107 'params': (dict, None),108 'args': (list, None),109 'stdout': (list, []),110 'out_file_specs': (list, []),111 'in_file_specs': (list, [])112}113def load_prog_spec(file):114 d = _toml_from_file(file)115 _validate_schema(d, _prog_spec_schema, file.name)116 _validate_param_types(d['params'], file.name)117 if not all(isinstance(x, str) for x in d['args']):118 raise UserError(f'{file.name}: args: type mismatch')119 d['stdout'] = list(map(functools.partial(get_outputspec, fname=file.name), d['stdout']))120 d['out_file_specs'] = list(map(functools.partial(121 get_outfilespec, fname=file.name), d['out_file_specs']))122 d['in_file_specs'] = list(map(functools.partial(123 get_infilespec, fname=file.name), d['in_file_specs']))124 return launch.ProgSpec(**d)125def write_launch_specs(file, launch_profile_file, prog_spec_file, executable, launch_specs):126 all_specs = list(map(attr.asdict, launch_specs))127 pytoml.dump({'launch_profile': launch_profile_file,128 'prog_spec': prog_spec_file,129 'executable': executable,130 'specs': list(all_specs)}, file)131_infile_creation_spec_schema = {132 'name': (str, None),133 'contents': (str, None)134}135def get_infile_creation_spec(spec, fname):136 _validate_schema(spec, _infile_creation_spec_schema, fname)137 return launch.InFileCreationSpec(**spec)138_launch_spec_schema = {139 'args': (list, None),140 'params': (dict, None),141 'infiles': (list, None)142}143_launch_spec_file_schema = {144 'launch_profile': (str, None),145 'prog_spec': (str, None),146 'executable': (str, None),147 'specs': (list, None)148}149def load_launch_specs(file):150 obj = _toml_from_file(file)151 _validate_schema(obj, _launch_spec_file_schema, file.name)152 with _open_file(obj['launch_profile'], file.name) as f:153 launch_profile = load_launch_profile(f)154 with _open_file(obj['prog_spec'], file.name) as f:155 prog_spec = load_prog_spec(f)156 spec_list = []157 for d in obj['specs']:158 _validate_schema(d, _launch_spec_schema, file.name)159 ifcs = list(map(functools.partial(get_infile_creation_spec, fname=file.name), d['infiles']))160 spec_list.append(launch.LaunchSpec(d['args'], d['params'], ifcs))161 return launch_profile, prog_spec, obj['executable'], spec_list162def write_results(file, results):163 ret = []164 for index, res in results.items():165 ret.append({166 'index': index,167 'jobid': str(res.jobid),168 'state': {'status': res.state.status.name, 'exit_code': res.state.exit_code},169 'cwd': res.cwd170 })171 pytoml.dump({'result': ret}, file)172_jobstate_schema = {173 'status': (str, None),174 'exit_code': (int, None)175}176def get_job_state(obj, fname):177 _validate_schema(obj, _jobstate_schema, fname)178 status = jobs.JobStateType[obj['status']]179 return jobs.JobState(status, obj['exit_code'])180_result_schema = {181 'index': (int, None),182 'jobid': (str, None),183 'state': (dict, None),184 'cwd': (str, None)185}186def load_results(file):187 obj = _toml_from_file(file)188 if not isinstance(obj, dict):189 raise UserError('invalid result spec')190 try:191 dlist = obj['result']192 except KeyError:193 raise UserError('invalid result spec')194 results = {}195 for d in dlist:196 _validate_schema(d, _result_schema, file.name)197 results[d['index']] = launch.Result(198 d['jobid'],199 get_job_state(d['state'], file.name),200 d['cwd'])201 return results202def genparams(args):203 site_spec = load_launch_profile(args.launch_profile)204 prog_spec = load_prog_spec(args.prog_spec)205 try:206 params = gen_params(args.param_spec.read())207 except ParamSpecError as e:208 raise UserError(str(e))209 params = list(params)210 print(f'Number of runs: {len(params)}', file=sys.stderr)...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

...21 TX_SCHEMA_VERSION)22_, TX_SCHEMA_TRANSFER = _load_schema('transaction_transfer_' +23 TX_SCHEMA_VERSION)24VOTE_SCHEMA_PATH, VOTE_SCHEMA = _load_schema('vote')25def _validate_schema(schema, body):26 """Validate data against a schema"""27 # Note28 #29 # Schema validation is currently the major CPU bottleneck of30 # BigchainDB. the `jsonschema` library validates python data structures31 # directly and produces nice error messages, but validation takes 4+ ms32 # per transaction which is pretty slow. The rapidjson library validates33 # much faster at 1.5ms, however it produces _very_ poor error messages.34 # For this reason we use both, rapidjson as an optimistic pathway and35 # jsonschema as a fallback in case there is a failure, so we can produce36 # a helpful error message.37 try:38 schema[1].validate(rapidjson.dumps(body))39 except ValueError as exc:40 try:41 jsonschema.validate(body, schema[0])42 except jsonschema.ValidationError as exc2:43 raise SchemaValidationError(str(exc2)) from exc244 logger.warning('code problem: jsonschema did not raise an exception, wheras rapidjson raised %s', exc)45 raise SchemaValidationError(str(exc)) from exc46def validate_transaction_schema(tx):47 """Validate a transaction dict.48 TX_SCHEMA_COMMON contains properties that are common to all types of49 transaction. TX_SCHEMA_[TRANSFER|CREATE] add additional constraints on top.50 """51 _validate_schema(TX_SCHEMA_COMMON, tx)52 if tx['operation'] == 'TRANSFER':53 _validate_schema(TX_SCHEMA_TRANSFER, tx)54 else:55 _validate_schema(TX_SCHEMA_CREATE, tx)56def validate_vote_schema(vote):57 """Validate a vote dict"""...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pandera automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful