How to use output_reader method in fMBT

Best Python code snippet using fMBT_python

output_ui.py

Source:output_ui.py Github

copy

Full Screen

1from __future__ import absolute_import2from builtins import str3from builtins import range4from builtins import object5from qgis.PyQt.QtWidgets import QDialog, QVBoxLayout, QFrame, QHBoxLayout, QLabel, QLineEdit, QToolButton, QTabWidget,\6 QWidget, QPushButton, QCheckBox, QGroupBox, QRadioButton, QFormLayout, QSpacerItem, QSizePolicy, QComboBox,\7 QFileDialog, QMessageBox, QApplication, QPlainTextEdit8from qgis.PyQt.QtGui import QCursor, QTextCursor9from qgis.PyQt.QtCore import Qt10from qgis.core import QgsProject, edit, Qgis11from .graphs import StaticMplCanvas12from ..geo_utils.utils import LayerUtils13from ..tools.parameters import Parameters, ConfigFile14from ..model.network import Junction, Reservoir, Tank, Pipe, Pump, Valve, Node15from ..model.binary_out_reader import BinaryOutputReader, OutputParamCodes16from ..model.options_report import Options, Quality17from ..tools.select_tool import SelectTool18from ..tools.data_stores import MemoryDS19from ..rendering.symbology import LinkSymbology, NodeSymbology20import codecs21import math22min_width = 60023min_height = 40024class OutputAnalyserDialog(QDialog):25 def __init__(self, iface, parent, params):26 QDialog.__init__(self, parent)27 self.iface = iface28 self.parent = parent29 self.params = params30 self.output_reader = None31 self.tool = None32 self.element_ids_nodes = None33 self.element_ids_links = None34 self.nodes_lay = None35 self.links_lay = None36 self.setWindowTitle(Parameters.plug_in_name)37 # Selection changed listeners38 self.params.junctions_vlay.selectionChanged.connect(self.feature_sel_changed)39 self.params.reservoirs_vlay.selectionChanged.connect(self.feature_sel_changed)40 self.params.tanks_vlay.selectionChanged.connect(self.feature_sel_changed)41 self.params.pipes_vlay.selectionChanged.connect(self.feature_sel_changed)42 self.params.pumps_vlay.selectionChanged.connect(self.feature_sel_changed)43 self.params.valves_vlay.selectionChanged.connect(self.feature_sel_changed)44 # self.setMinimumWidth(min_width)45 # self.setMinimumHeight(min_height)46 fra_main_lay = QVBoxLayout(self)47 self.fra_out_file = QFrame(self)48 fra_out_file_lay = QHBoxLayout(self.fra_out_file)49 self.lbl_out_file = QLabel('Simulation output file:')50 self.txt_out_file = QLineEdit('')51 self.txt_out_file.setReadOnly(True)52 self.btn_out_file = QToolButton()53 self.btn_out_file.setText('...')54 self.btn_out_file.clicked.connect(self.btn_out_file_clicked)55 fra_out_file_lay.addWidget(self.lbl_out_file)56 fra_out_file_lay.addWidget(self.txt_out_file)57 fra_out_file_lay.addWidget(self.btn_out_file)58 self.tab_widget = QTabWidget(self)59 # Graphs tab ---------------------------------------------------------------------------------------------------60 self.tab_graphs = QWidget()61 tab_graphs_lay = QHBoxLayout(self.tab_graphs)62 # Left frame63 self.fra_graphs_left = QFrame()64 self.fra_graphs_left.setMaximumWidth(100)65 fra_graphs_left_lay = QVBoxLayout(self.fra_graphs_left)66 self.btn_sel_element = QPushButton('Pick')67 self.btn_sel_element.clicked.connect(self.btn_sel_element_clicked)68 fra_graphs_left_lay.addWidget(self.btn_sel_element)69 # Nodes70 self.grb_nodes = QGroupBox(u'Nodes')71 lay_grb_nodes = QVBoxLayout(self.grb_nodes)72 self.chk_node_demand = QCheckBox('Demand')73 lay_grb_nodes.addWidget(self.chk_node_demand)74 self.chk_node_head = QCheckBox('Head')75 lay_grb_nodes.addWidget(self.chk_node_head)76 self.chk_node_pressure = QCheckBox('Pressure')77 lay_grb_nodes.addWidget(self.chk_node_pressure)78 self.chk_node_quality = QCheckBox('Quality')79 lay_grb_nodes.addWidget(self.chk_node_quality)80 fra_graphs_left_lay.addWidget(self.grb_nodes)81 # Links82 self.grb_links = QGroupBox(u'Links')83 lay_grb_links = QVBoxLayout(self.grb_links)84 self.chk_link_flow = QCheckBox('Flow')85 lay_grb_links.addWidget(self.chk_link_flow)86 self.chk_link_velocity = QCheckBox('Velocity')87 lay_grb_links.addWidget(self.chk_link_velocity)88 self.chk_link_headloss = QCheckBox('Headloss')89 lay_grb_links.addWidget(self.chk_link_headloss)90 self.chk_link_quality = QCheckBox('Quality')91 lay_grb_links.addWidget(self.chk_link_quality)92 fra_graphs_left_lay.addWidget(self.grb_links)93 self.btn_draw_graph = QPushButton('Draw')94 self.btn_draw_graph.clicked.connect(self.draw_graphs)95 fra_graphs_left_lay.addWidget(self.btn_draw_graph)96 self.spacer = QSpacerItem(20, 40, QSizePolicy.Minimum, QSizePolicy.Expanding)97 fra_graphs_left_lay.addItem(self.spacer)98 tab_graphs_lay.addWidget(self.fra_graphs_left)99 # Right frame100 self.fra_graphs_right = QFrame()101 fra_graphs_right_lay = QVBoxLayout(self.fra_graphs_right)102 fra_graphs_right_lay.setContentsMargins(0, 0, 0, 0)103 self.static_canvas = StaticMplCanvas(self.fra_graphs_right, width=5, height=4, dpi=100)104 fra_graphs_right_lay.addWidget(self.static_canvas)105 tab_graphs_lay.addWidget(self.fra_graphs_right)106 # lay.addWidget(self.button)107 self.tab_widget.addTab(self.tab_graphs, 'Graphs')108 # Maps tab -----------------------------------------------------------------------------------------------------109 self.tab_maps = QWidget()110 tab_maps_lay = QHBoxLayout(self.tab_maps)111 # Left frame112 self.fra_maps_left = QFrame()113 self.fra_maps_left.setMaximumWidth(200)114 fra_maps_left_lay = QVBoxLayout(self.fra_maps_left)115 self.grb_maps = QGroupBox(u'Variable')116 grb_maps_lay = QVBoxLayout(self.grb_maps)117 self.rad_maps_node_demand = QRadioButton(u'Node demand')118 grb_maps_lay.addWidget(self.rad_maps_node_demand)119 self.rad_maps_node_head = QRadioButton(u'Node head')120 grb_maps_lay.addWidget(self.rad_maps_node_head)121 self.rad_maps_node_pressure = QRadioButton(u'Node pressure')122 grb_maps_lay.addWidget(self.rad_maps_node_pressure)123 self.rad_maps_node_quality = QRadioButton(u'Node quality')124 grb_maps_lay.addWidget(self.rad_maps_node_quality)125 self.rad_maps_link_flow = QRadioButton(u'Link flow')126 grb_maps_lay.addWidget(self.rad_maps_link_flow)127 self.rad_maps_link_velocity = QRadioButton(u'Link velocity')128 grb_maps_lay.addWidget(self.rad_maps_link_velocity)129 self.rad_maps_link_headloss = QRadioButton(u'Link headloss')130 grb_maps_lay.addWidget(self.rad_maps_link_headloss)131 self.rad_maps_link_quality = QRadioButton(u'Link quality')132 grb_maps_lay.addWidget(self.rad_maps_link_quality)133 fra_maps_left_lay.addWidget(self.grb_maps)134 fra_maps_left_lay.addItem(self.spacer)135 tab_maps_lay.addWidget(self.fra_maps_left)136 # Right maps frame137 self.fra_maps_right = QFrame()138 fra_maps_right_lay = QVBoxLayout(self.fra_maps_right)139 self.fra_maps_right_time = QFrame()140 fra_maps_right_time_lay = QFormLayout(self.fra_maps_right_time)141 self.lbl_map_times = QLabel(u'Period [h]:')142 self.cbo_map_times = QComboBox()143 fra_maps_right_time_lay.addRow(self.lbl_map_times, self.cbo_map_times)144 fra_maps_right_lay.addWidget(self.fra_maps_right_time)145 self.btn_draw_map = QPushButton(u'Draw map')146 self.btn_draw_map.clicked.connect(self.draw_maps)147 fra_maps_right_lay.addWidget(self.btn_draw_map)148 fra_maps_right_lay.addItem(self.spacer)149 tab_maps_lay.addWidget(self.fra_maps_right)150 self.tab_widget.addTab(self.tab_maps, 'Maps')151 # # Add to main152 fra_main_lay.addWidget(self.fra_out_file)153 fra_main_lay.addWidget(self.tab_widget)154 self.setup()155 self.initialize()156 # self.read_outputs()157 # Set size158 self.setMinimumWidth(self.tab_graphs.width())159 self.setMinimumHeight(self.tab_graphs.height())160 def setup(self):161 pass162 def btn_out_file_clicked(self):163 config_file = ConfigFile(Parameters.config_file_path)164 out_file, __ = QFileDialog.getOpenFileName(165 self,166 'Select out file',167 config_file.get_last_out_file(),168 'Out files (*.out)')169 if out_file is None or out_file == '':170 return171 config_file.set_last_out_file(out_file)172 self.txt_out_file.setText(out_file)173 self.read_outputs()174 if self.output_reader is None:175 return176 # Fill times combo177 self.cbo_map_times.clear()178 for period_s in self.output_reader.period_results.keys():179 text = self.seconds_to_string(180 period_s,181 self.output_reader.sim_duration_secs,182 self.output_reader.report_time_step_secs)183 self.cbo_map_times.addItem(text, period_s)184 # Activate widgets185 self.btn_sel_element.setEnabled(self.output_reader is not None)186 self.btn_draw_graph.setEnabled(self.output_reader is not None)187 self.grb_maps.setEnabled(self.output_reader is not None)188 self.btn_draw_map.setEnabled(self.output_reader is not None)189 def initialize(self):190 # Graphs191 self.grb_nodes.setEnabled(False)192 self.grb_links.setEnabled(False)193 self.btn_sel_element.setEnabled(self.output_reader is not None)194 self.btn_draw_graph.setEnabled(self.output_reader is not None)195 # Maps196 self.grb_maps.setEnabled(self.output_reader is not None)197 self.rad_maps_node_demand.setChecked(True)198 self.btn_draw_map.setEnabled(self.output_reader is not None)199 def feature_sel_changed(self):200 is_nodes = False201 sel_junctions = self.params.junctions_vlay.selectedFeatureCount()202 sel_reservoirs = self.params.reservoirs_vlay.selectedFeatureCount()203 sel_tanks = self.params.tanks_vlay.selectedFeatureCount()204 if sel_junctions > 0 or sel_reservoirs > 0 or sel_tanks > 0:205 is_nodes = True206 self.grb_nodes.setEnabled(is_nodes)207 is_links = False208 sel_pipes = self.params.pipes_vlay.selectedFeatureCount()209 sel_pumps = self.params.pumps_vlay.selectedFeatureCount()210 sel_valves = self.params.valves_vlay.selectedFeatureCount()211 if sel_pipes > 0 or sel_pumps > 0 or sel_valves > 0:212 is_links = True213 self.grb_links.setEnabled(is_links)214 def read_outputs(self):215 try:216 QApplication.setOverrideCursor(Qt.WaitCursor)217 self.output_reader = BinaryOutputReader()218 self.output_reader.read(self.txt_out_file.text())219 QApplication.restoreOverrideCursor()220 # Check if output compatible with loaded project221 compatible = True222 out_nodes_nr = self.output_reader.nodes_nr223 out_tanks_reservs_nr = self.output_reader.tanks_reservs_nr224 out_juncts_nr = out_nodes_nr - out_tanks_reservs_nr225 out_links_nr = self.output_reader.links_nr226 out_pumps_nr = self.output_reader.pumps_nr227 out_valves_nr = self.output_reader.valves_nr228 out_pipes_nr = out_links_nr - out_pumps_nr - out_valves_nr229 if out_juncts_nr != self.params.junctions_vlay.featureCount():230 compatible = False231 if out_tanks_reservs_nr != (self.params.reservoirs_vlay.featureCount() + self.params.tanks_vlay.featureCount()):232 compatible = False233 if out_pipes_nr != self.params.pipes_vlay.featureCount():234 compatible = False235 if out_valves_nr != self.params.valves_vlay.featureCount():236 compatible = False237 if out_pumps_nr != self.params.pumps_vlay.featureCount():238 compatible = False239 if not compatible:240 message = 'The out file appears to incompatible with the actual project layers.'241 QMessageBox.warning(242 self,243 Parameters.plug_in_name,244 message,245 QMessageBox.Ok)246 self.output_reader = None247 self.txt_out_file.setText('')248 else:249 # Message after reading completed250 message = 'Out file loaded: ' + str(out_nodes_nr) + ' nodes, ' + str(out_links_nr) + ' links found.'251 # Clear refs to output layer252 self.params.out_lay_node_demand = None253 self.params.out_lay_node_head = None254 self.params.out_lay_node_pressure = None255 self.params.out_lay_node_quality = None256 self.params.out_lay_link_flow = None257 self.params.out_lay_link_velocity = None258 self.params.out_lay_link_headloss = None259 self.params.out_lay_link_quality = None260 QMessageBox.information(261 self,262 Parameters.plug_in_name,263 message,264 QMessageBox.Ok)265 finally:266 # self.iface.messageBar().pushWarning(267 # Parameters.plug_in_name,268 # 'Error while reading output file.') # TODO: softcode269 # self.output_reader = None270 # self.txt_out_file.setText('')271 QApplication.restoreOverrideCursor()272 def btn_sel_element_clicked(self):273 if self.output_reader is None:274 self.iface.messageBar().pushMessage(275 Parameters.plug_in_name,276 'Please select the simulation out file.',277 Qgis.Warning,278 5) # TODO: softcode279 return280 self.tool = SelectTool(self, self.params)281 self.iface.mapCanvas().setMapTool(self.tool)282 cursor = QCursor()283 cursor.setShape(Qt.ArrowCursor)284 self.iface.mapCanvas().setCursor(cursor)285 def draw_graphs(self):286 # Get selected features287 self.element_ids_nodes = []288 for junction_feat in self.params.junctions_vlay.selectedFeatures():289 self.element_ids_nodes.append(junction_feat.attribute(Junction.field_name_eid))290 for reservoir_feat in self.params.reservoirs_vlay.selectedFeatures():291 self.element_ids_nodes.append(reservoir_feat.attribute(Reservoir.field_name_eid))292 for tank_feat in self.params.tanks_vlay.selectedFeatures():293 self.element_ids_nodes.append(tank_feat.attribute(Tank.field_name_eid))294 self.element_ids_links = []295 for pipe_feat in self.params.pipes_vlay.selectedFeatures():296 self.element_ids_links.append(pipe_feat.attribute(Pipe.field_name_eid))297 for pump_feat in self.params.pumps_vlay.selectedFeatures():298 self.element_ids_links.append(pump_feat.attribute(Pump.field_name_eid))299 for valve_feat in self.params.valves_vlay.selectedFeatures():300 self.element_ids_links.append(valve_feat.attribute(Valve.field_name_eid))301 # Build values dictionaries302 xs = self.output_reader.report_times303 ys_d_d = {}304 params_count = 0305 # Nodes306 if self.grb_nodes.isEnabled():307 if self.chk_node_demand.isChecked():308 params_count += 1309 ys_d = {}310 for element_id in self.element_ids_nodes:311 ys_d[element_id] = [312 self.output_reader.node_demands_d[element_id],313 self.params.options.flow_units]314 ys_d_d[OutputParamCodes.NODE_DEMAND] = ys_d315 if self.chk_node_head.isChecked():316 params_count += 1317 ys_d = {}318 for element_id in self.element_ids_nodes:319 ys_d[element_id] = [320 self.output_reader.node_heads_d[element_id],321 Options.units_diameter_tanks[self.params.options.units]]322 ys_d_d[OutputParamCodes.NODE_HEAD] = ys_d323 if self.chk_node_pressure.isChecked():324 params_count += 1325 ys_d = {}326 for element_id in self.element_ids_nodes:327 ys_d[element_id] = [328 self.output_reader.node_pressures_d[element_id],329 Options.units_pressure[self.params.options.units]]330 ys_d_d[OutputParamCodes.NODE_PRESSURE] = ys_d331 if self.chk_node_quality.isChecked():332 params_count += 1333 ys_d = {}334 for element_id in self.element_ids_nodes:335 ys_d[element_id] = [336 self.output_reader.node_qualities_d[element_id],337 Quality.quality_units_text[self.params.options.quality.mass_units]]338 ys_d_d[OutputParamCodes.NODE_QUALITY] = ys_d339 # Links340 if self.grb_links.isEnabled():341 if self.chk_link_flow.isChecked():342 params_count += 1343 ys_d = {}344 for element_id in self.element_ids_links:345 ys_d[element_id] = [346 self.output_reader.link_flows_d[element_id],347 self.params.options.flow_units]348 ys_d_d[OutputParamCodes.LINK_FLOW] = ys_d349 if self.chk_link_velocity.isChecked():350 params_count += 1351 ys_d = {}352 for element_id in self.element_ids_links:353 ys_d[element_id] = [354 self.output_reader.link_velocities_d[element_id],355 Options.units_velocity[self.params.options.units]]356 ys_d_d[OutputParamCodes.LINK_VELOCITY] = ys_d357 if self.chk_link_headloss.isChecked():358 params_count += 1359 ys_d = {}360 for element_id in self.element_ids_links:361 ys_d[element_id] = [362 self.output_reader.link_headlosses_d[element_id],363 Options.units_diameter_tanks[self.params.options.units]]364 ys_d_d[OutputParamCodes.LINK_HEADLOSS] = ys_d365 if self.chk_link_quality.isChecked():366 params_count += 1367 ys_d = {}368 for element_id in self.element_ids_links:369 ys_d[element_id] = [370 self.output_reader.link_qualities_d[element_id],371 Quality.quality_units_text[self.params.options.quality.mass_units]]372 ys_d_d[OutputParamCodes.LINK_QUALITY] = ys_d373 if ys_d_d:374 self.static_canvas.draw_output_line(xs, ys_d_d, params_count)375 def draw_maps(self):376 """377 Draws layers with all the attributes378 :return:379 """380 report_time = self.cbo_map_times.itemText(self.cbo_map_times.currentIndex())381 if self.rad_maps_node_demand.isChecked(): # -------------------------------------------------------------------382 lay_name = u'Node demand'383 lay_id = self.draw_map(LayerType.NODE, self.params.out_lay_node_demand_id, lay_name,384 self.output_reader.node_demands_d, report_time)385 self.params.out_lay_node_demand_id = lay_id386 elif self.rad_maps_node_head.isChecked():387 lay_name = u'Node head'388 lay_id = self.draw_map(LayerType.NODE, self.params.out_lay_node_head_id, lay_name,389 self.output_reader.node_heads_d, report_time)390 self.params.out_lay_node_head_id = lay_id391 elif self.rad_maps_node_pressure.isChecked():392 lay_name = u'Node pressure'393 lay_id = self.draw_map(LayerType.NODE, self.params.out_lay_node_pressure_id, lay_name,394 self.output_reader.node_pressures_d, report_time)395 self.params.out_lay_node_pressure_id = lay_id396 elif self.rad_maps_node_quality.isChecked():397 lay_name = u'Node quality'398 lay_id = self.draw_map(LayerType.NODE, self.params.out_lay_node_quality_id, lay_name,399 self.output_reader.node_qualities_d, report_time)400 self.params.out_lay_node_quality_id = lay_id401 elif self.rad_maps_link_flow.isChecked(): # -------------------------------------------------------------------402 lay_name = u'Link flow'403 lay_id = self.draw_map(LayerType.LINK, self.params.out_lay_link_flow_id, lay_name,404 self.output_reader.link_flows_d, report_time)405 self.params.out_lay_link_flow_id = lay_id406 elif self.rad_maps_link_velocity.isChecked():407 lay_name = u'Link velocity'408 lay_id = self.draw_map(LayerType.LINK, self.params.out_lay_link_velocity_id, lay_name,409 self.output_reader.link_velocities_d, report_time)410 self.params.out_lay_link_velocity_id = lay_id411 elif self.rad_maps_link_headloss.isChecked():412 lay_name = u'Link headloss'413 lay_id = self.draw_map(LayerType.LINK, self.params.out_lay_link_headloss_id, lay_name,414 self.output_reader.link_headlosses_d, report_time)415 self.params.out_lay_link_headloss_id = lay_id416 elif self.rad_maps_link_quality.isChecked():417 lay_name = u'Link quality'418 lay_id = self.draw_map(LayerType.LINK, self.params.out_lay_link_quality_id, lay_name,419 self.output_reader.link_qualities_d, report_time)420 self.params.out_lay_link_quality_id = lay_id421 def draw_map(self, lay_type, lay_id, lay_name, dataset, report_time):422 try:423 QApplication.setOverrideCursor(Qt.WaitCursor)424 lay_name += ' ' + report_time425 lay = LayerUtils.get_lay_from_id(lay_id)426 if lay is None:427 if lay_type == LayerType.NODE:428 lay = self.create_out_node_layer(lay_name, dataset)429 ns = NodeSymbology()430 lay.setRenderer(ns.make_graduated_sym_renderer(lay, report_time))431 elif lay_type == LayerType.LINK:432 lay = self.create_out_link_layer(lay_name, dataset)433 ls = LinkSymbology()434 lay.setRenderer(ls.make_flow_sym_renderer(lay, report_time))435 lay_id = lay.id()436 QgsProject.instance().addMapLayer(lay)437 self.params.out_layers.append(lay)438 else:439 lay.setLayerName(lay_name)440 lay.triggerRepaint()441 QApplication.restoreOverrideCursor()442 finally:443 QApplication.restoreOverrideCursor()444 return lay_id445 def btn_cancel_clicked(self):446 self.setVisible(False)447 def btn_ok_clicked(self):448 pass449 def create_out_node_layer(self, lay_name, values_d):450 return self.create_out_layer(lay_name, values_d, LayerType.NODE)451 def create_out_link_layer(self, lay_name, values_d):452 return self.create_out_layer(lay_name, values_d, LayerType.LINK)453 def create_out_layer(self, lay_name, values_d, lay_type):454 field_name_vars = []455 periods = list(self.output_reader.period_results.keys())456 for period_s in periods:457 text = self.seconds_to_string(458 period_s,459 self.output_reader.sim_duration_secs,460 self.output_reader.report_time_step_secs)461 field_name_vars.append(text)462 if lay_type == LayerType.NODE:463 new_lay = MemoryDS.create_nodes_lay(self.params, field_name_vars, lay_name, self.params.crs)464 elif lay_type == LayerType.LINK:465 new_lay = MemoryDS.create_links_lay(self.params, field_name_vars, lay_name, self.params.crs)466 with edit(new_lay):467 # Update attributes468 for feat in new_lay.getFeatures():469 fid = feat.id()470 eid = feat.attribute(Node.field_name_eid)471 values = values_d[eid]472 for p in range(len(periods)):473 new_lay.changeAttributeValue(fid, p+1, values[p])474 return new_lay475 def seconds_to_string(self, period_s, duration_s, interval_s):476 day = int(math.floor(period_s / 86400))477 hour = period_s / 3600 - day * 24478 minute = period_s / 60 - day * 24 * 60 - hour * 60479 second = period_s - day * 86400 - hour * 3600 - minute * 60480 text = ''481 if duration_s >= 86400:482 # We need days483 text += str(day) + 'd'484 if duration_s >= 3600:485 # We need hours486 text += '{:02}'.format(hour) + 'H'487 text += '{:02}'.format(minute) + 'm'488 if second > 0:489 text += '{:02}'.format(second) + 's'490 return text491class LayerType(object):492 def __init__(self):493 pass494 NODE = 0495 LINK = 1496class LogDialog(QDialog):497 def __init__(self, parent, rpt_file_path):498 self.rpt_file_path = rpt_file_path499 QDialog.__init__(self, parent)500 self.setWindowTitle('Log: ' + self.rpt_file_path)501 main_lay = QVBoxLayout(self)502 self.txt_log = QPlainTextEdit(self)503 self.txt_log.setMinimumWidth(500)504 self.txt_log.setMinimumHeight(300)505 main_lay.addWidget(self.txt_log)506 self.btn_close = QPushButton('Close')507 self.btn_close.clicked.connect(self.close_dialog)508 main_lay.addWidget(self.btn_close)509 self.fill_txt()510 def close_dialog(self):511 self.close()512 def fill_txt(self):513 with codecs.open(self.rpt_file_path, 'r', encoding='UTF-8') as inp_f:514 lines = inp_f.read().splitlines()515 for line in lines:516 self.txt_log.appendPlainText(line.replace('\b', ''))517 # Scroll up518 self.txt_log.moveCursor(QTextCursor.Start)...

Full Screen

Full Screen

test_shellcommand.py

Source:test_shellcommand.py Github

copy

Full Screen

1"""Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2 Licensed under the Apache License, Version 2.0 (the "License").3 You may not use this file except in compliance with the License.4 You may obtain a copy of the License at5 http://www.apache.org/licenses/LICENSE-2.06 Unless required by applicable law or agreed to in writing, software7 distributed under the License is distributed on an "AS IS" BASIS,8 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.9 See the License for the specific language governing permissions and10 limitations under the License.11"""12import unittest13from multiprocessing import Pipe, Process14from autotrail.core.api.management import read_messages15from autotrail.workflow.helpers.step import ShellCommand, ShellCommandFailedError, Step16DELAY = 0.517class ShellCommandTests(unittest.TestCase):18 def test_successful_run(self):19 shell_command = ShellCommand(delay=DELAY)20 output_reader, output_writer = Pipe(duplex=False)21 shell_command(output_writer, 'echo "Hello"')22 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), [23 '[Command: echo "Hello"] -- Starting...',24 '[STDOUT] -- Hello'])25 def test_output_filter(self):26 shell_command = ShellCommand(stdout_filter=lambda x: x if 'Hello' not in x else None, delay=DELAY)27 output_reader, output_writer = Pipe(duplex=False)28 shell_command(output_writer, 'echo "Hello\nBye"')29 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), [30 '[Command: echo "Hello\nBye"] -- Starting...',31 '[STDOUT] -- Bye'])32 def test_output_tailing(self):33 shell_command = ShellCommand(delay=DELAY)34 output_reader, output_writer = Pipe(duplex=False)35 command = 'echo "Hello"; sleep 1; echo "Bye"'36 process = Process(target=shell_command, args=(output_writer, command), kwargs=dict(shell=True))37 process.start()38 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), [39 '[Command: {}] -- Starting...'.format(command),40 '[STDOUT] -- Hello'])41 process.join()42 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), ['[STDOUT] -- Bye'])43 def test_error_detection(self):44 shell_command = ShellCommand(error_filter=lambda x: x if 'Error' in x else None, delay=DELAY)45 output_reader, output_writer = Pipe(duplex=False)46 with self.assertRaises(ShellCommandFailedError):47 shell_command(output_writer, 'echo "Hello\nSome Error"')48 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), [49 '[Command: echo "Hello\nSome Error"] -- Starting...',50 '[STDOUT] -- Hello'])51 def test_exit_code_check(self):52 shell_command = ShellCommand(delay=DELAY)53 output_reader, output_writer = Pipe(duplex=False)54 filename = 'foo_bar_baz'55 command = 'ls {}'.format(filename)56 with self.assertRaises(ShellCommandFailedError):57 shell_command(output_writer, command)58 stdout = list(read_messages(output_reader, timeout=DELAY))59 self.assertEqual(stdout[0], '[Command: {}] -- Starting...'.format(command))60 self.assertIn('[STDERR] -- ', stdout[1])61 self.assertIn('No such file or directory', stdout[1])62 def test_error_filter(self):63 shell_command = ShellCommand(stderr_filter=lambda x: None if 'No such file' in x else x, delay=DELAY)64 output_reader, output_writer = Pipe(duplex=False)65 filename = 'foo_bar_baz'66 command = 'ls {}'.format(filename)67 with self.assertRaises(ShellCommandFailedError):68 shell_command(output_writer, command)69 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), [70 '[Command: {}] -- Starting...'.format(command),71 ])72 def test_exit_code_filter(self):73 shell_command = ShellCommand(exit_code_filter=lambda x: None if x != 0 else x, delay=DELAY)74 output_reader, output_writer = Pipe(duplex=False)75 filename = 'foo_bar_baz'76 command = 'ls {}'.format(filename)77 shell_command(output_writer, command)78 stdout = list(read_messages(output_reader, timeout=DELAY))79 self.assertEqual(stdout[0], '[Command: {}] -- Starting...'.format(command))80 self.assertIn('[STDERR] -- ', stdout[1])81 self.assertIn('No such file or directory', stdout[1])82 def test_stdin(self):83 shell_command = ShellCommand(delay=DELAY)84 output_reader, output_writer = Pipe(duplex=False)85 input_reader, input_writer = Pipe(duplex=False)86 input_writer.send('foo\n')87 shell_command(output_writer, 'read a; echo $a', input_reader=input_reader, shell=True)88 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), [89 '[Command: read a; echo $a] -- Starting...',90 '[STDOUT] -- foo'])91class ShellCommandStepTests(unittest.TestCase):92 def test_shell_command_step(self):93 shell_command = ShellCommand(delay=DELAY)94 output_reader, output_writer = Pipe(duplex=False)95 step = Step(shell_command)96 step.start(output_writer, 'echo "Hello"')97 step.join()98 self.assertFalse(step.is_alive())99 self.assertEqual(str(step), str(shell_command))100 self.assertEqual(list(read_messages(output_reader, timeout=DELAY)), ['[Command: echo "Hello"] -- Starting...',...

Full Screen

Full Screen

make_dataset.py

Source:make_dataset.py Github

copy

Full Screen

1#########################################################2# make_dataset.py3#########################################################45import csv6import random78output_reader = []9output_train = []10output_dev = []11output_test = []1213with open('/home/zhu/Desktop/BERT_temporaryname/datasets/set_data.csv') as f:14 reader = csv.reader(f)15 lines = [row for row in reader]16 #print(lines)17 len_lines = len(lines) #print(len_lines)18 for i in range(len_lines):19 line = lines[i]#.split('\t')20 output_reader.append(line)2122 #print(output_reader)23 #print(line)2425def output_data(num, list_output): 26 for i in range(num):27 l = random.choice(output_reader)#print(l)28 output_reader.remove(l)#print(len(output_reader))29 list_output.append(l)30 i += 13132def write_data(route, output_set):33 print(output_set)34 with open(route, 'a') as f:35 writer = csv.writer(f)36 for i in range(len(output_set)):37 writer.writerow([output_set[i][0], output_set[i][1]])38 #writer.writerow([output_set[i]])3940# dev, test, train dataのデータ量を変える41output_data(400, output_train)42output_data(100, output_dev)43#output_data(50, output_test)4445write_data('/home/zhu/Desktop/BERT_temporaryname/datasets/set_train_data_400.csv', output_train)46write_data('/home/zhu/Desktop/BERT_temporaryname/datasets/set_dev_data_100.csv', output_dev)47#write_data('/home/zhu/Desktop/BERT_temporaryname/datasets/set_test_n_100_v3.csv', output_test) ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful