How to use report_stream method in Slash

Best Python code snippet using slash

grading_tools.py

Source:grading_tools.py Github

copy

Full Screen

1import io2import cs_grading.executable_tools as exe3import cs_grading.io_tools as sysio4import cs_grading.logging_tools as log5import cs_grading.markdown_tools as md6import cs_grading.result_tools as res7import cs_grading.rubric_tools as rb8####################################################################################################9# Purpose: Defining several constants that can be used for shorthand in many of the methods below.10####################################################################################################11GRHLEVEL_TITLE = 212GRHLEVEL_PROBLEM = 313GRHLEVEL_OTHER = 414GRFOOTER = 'Please close this issue once you have viewed your grade report, '15GRFOOTER += 'or let the grader know if you need further details about this grade report '16GRFOOTER += '(see instructions below).\n\n'17GRFOOTER += '### Disputes\n\n'18GRFOOTER += 'If you wish to discuss any matters regarding your score on this assignment, '19GRFOOTER += 'please write it as a comment on this issue and assign your grader. '20GRFOOTER += 'Any and all discussions pertaining to your homework score **must** be done via '21GRFOOTER += 'comments on this issue. Do not use email to discuss grading matters as they won\'t '22GRFOOTER += 'be considered part of the regrade process. All discussions will be done privately '23GRFOOTER += 'using this issue.\n\n'24GRFOOTER += 'You have 7 days from the time this issue was posted to *raise* a dispute about any '25GRFOOTER += 'grading issues. Allow up to 48 hours for your grader to respond. If your grader '26GRFOOTER += 'cannot resolve the issue within 7 days, you must elevate the issue to one of the TAs '27GRFOOTER += '(Michail, Gozde, or Yeji) by assigning them to this issue or attending their office '28GRFOOTER += 'hours. If after an additional 7 days you are still unhappy with the grading result, '29GRFOOTER += 'you must elevate the issue to a professor within 3 days. The professors will have the '30GRFOOTER += 'final input.\n\n'31GRFOOTER += 'This means you have a maximum of 7 + 7 + 3 days to resolve all grading disputes, '32GRFOOTER += 'after which point all grades are final.\n\n'33GRFOOTER += 'You are expected to follow the _Grading Disputes_ policies outlined on the '34GRFOOTER += '[assignments](http://bytes.usc.edu/cs104/assignments) page.'35####################################################################################################36# Purpose: Grader information.37# Arguments:38# name: The grader's name.39# github: The grader's GitHub id.40####################################################################################################41class Grader:42 def __init__(self, grader_info=None, logging_level=log.LOGLEVEL_ERROR):43 self.name = 'Grader-name'44 self.github = 'grader-github-ID'45 if grader_info is not None:46 self.load_grader_info(grader_info, logging_level)47 def load_grader_info(self, grader_info, logging_level=log.LOGLEVEL_ERROR):48 if sysio.exist_file(grader_info):49 grader_file = open(grader_info, mode='r', errors='ignore')50 lines = grader_file.readlines()51 grader_file.close()52 if len(lines) < 2:53 log.log_error(54 'Grader info needs 2 lines but found {}'.format(str(len(lines))),55 logging_level56 )57 else:58 if len(lines) > 2:59 log.log_warning(60 'Grader info needs 2 lines but found {}'.format(str(len(lines))),61 logging_level62 )63 self.name = lines[0].strip()64 self.github = lines[1].strip()65 else:66 log.log_error('Grader info file {} not found'.format(grader_info), logging_level)67 def get_grader_info(self):68 return '{name} (@{github})'.format(name=self.name, github=self.github)69####################################################################################################70# Purpose: Breakdown of homework grading.71# Arguments:72# number: The homework number.73####################################################################################################74class Homework:75 def __init__(self,76 number,77 result_dir,78 remove_output,79 detailed_results=True,80 compile_flags=exe.CPPFLAGS,81 logging_level=log.LOGLEVEL_ERROR82 ):83 self.number = number84 self.result_dir = result_dir85 self.remove_output = remove_output86 self.detailed_results = detailed_results87 self.compile_flags = compile_flags88 self.logging_level = logging_level89 self.total_points = 090 self.total_max = 091 self.total_other = 092 self.problems = []93 self.other_deductions = []94 self.problem_subtotal = []95 sysio.mkdir(self.result_dir, logging_level=self.logging_level)96 def add_problem(self, problem):97 self.problems.append(problem)98 def _add_other_deduction(self, points, comment):99 self.other_deductions.append((points, comment))100 def grade_other_deduction(self, other_rubric):101 other_deduct = rb.load_other_rubric(other_rubric, self.logging_level)102 for deduct in other_deduct:103 self._add_other_deduction(104 deduct[rb.RUBRIC_OTHER_POINT],105 deduct[rb.RUBRIC_OTHER_COMMENT])106 def write_score_breakdown(self, report_file):107 report_stream = io.StringIO()108 for problem in self.problems:109 problem.write_score_breakdown(report_stream)110 points = problem.get_test_points()111 self.total_points += points112 self.total_max += problem.test_max113 self.problem_subtotal.append(str(round(points, 2)))114 md.write_header(report_stream, 'Other Deductions & Penalties:', GRHLEVEL_PROBLEM)115 for deduct in self.other_deductions:116 self.total_other += deduct[0]117 self.total_points -= deduct[0]118 deduction_text = '(-{pt} points) {comment}'.format(119 pt=round(deduct[0], 2),120 comment=deduct[1])121 md.write_list(report_stream, deduction_text)122 if self.other_deductions:123 md.end_list(report_stream)124 self.total_points = max(self.total_points, 0)125 self._write_score_calculation(report_stream)126 summary_text = 'I have completely finished grading your'127 summary_text += ' HW {num:02d} test cases and you received'.format(num=self.number)128 summary_text += ' **{total}/{max} points**.'.format(129 total=round(self.total_points, 2),130 max=round(self.total_max, 2))131 breakdown_text = 'This is the score breakdown:'132 md.write_paragraph(report_file, summary_text)133 md.write_paragraph(report_file, breakdown_text)134 md.write_paragraph(report_file, report_stream.getvalue().strip())135 report_stream.close()136 def _write_score_calculation(self, report_file):137 md.write_header(report_file, 'Total Score:', GRHLEVEL_PROBLEM)138 calc = '({test}) - ({other}) = **{total:}/{max} points**'.format(139 test=' + '.join(self.problem_subtotal),140 other=self.total_other,141 total=round(self.total_points, 2),142 max=round(self.total_max, 2))143 md.write_list(report_file, calc)144 md.end_list(report_file)145####################################################################################################146# Purpose: Breakdown of homework grading.147# Arguments:148# number: The problem number.149# nams: The problem name.150# test_max: The max number of points.151####################################################################################################152class Problem:153 def __init__(self, homework, number, name, test_max):154 homework.add_problem(self)155 self.result_dir = homework.result_dir156 self.remove_output = homework.remove_output157 self.detailed_results = homework.detailed_results158 self.compile_flags = homework.compile_flags159 self.logging_level = homework.logging_level160 self.number = number161 self.name = name162 self.test_max = test_max163 self.test_count = 0164 self.test_deductions = []165 self.test_points = None166 self.valgrind_error = 0167 self.compile_file = self.result_dir + '/' + self.name + '_compile.txt'168 self.result_file = self.result_dir + '/' + self.name + '_result.txt'169 self.valgrind_file = self.result_dir + '/' + self.name + '_valgrind.txt'170 self.formatted_file = self.result_dir + '/' + self.name + '_formatted.txt'171 sysio.clean_file(self.compile_file)172 sysio.clean_file(self.result_file)173 sysio.clean_file(self.valgrind_file)174 sysio.clean_file(self.formatted_file)175 self.use_valgrind = None176 self.timeout = None177 self.rubric = dict()178 def generate_results(self,179 grading,180 use_valgrind,181 timeout=None,182 ):183 self.use_valgrind = use_valgrind184 self.timeout = timeout185 return grading(self)186 def open_result(self, text_editor=sysio.TEXT_EDITOR):187 if sysio.exist_file(self.result_file):188 sysio.open_file(self.result_file, text_editor)189 if sysio.exist_file(self.valgrind_file):190 sysio.open_file(self.valgrind_file, text_editor)191 def grade_problem(self, general_rubric=None, problem_rubric=None):192 if not self._results_generated():193 log.log_error(194 'Grading problem {} before results are generated'.format(self.name),195 self.logging_level)196 else:197 if general_rubric is not None:198 self.rubric = rb.load_rubric(199 general_rubric,200 self.rubric,201 logging_level=self.logging_level)202 if problem_rubric is not None:203 self.rubric = rb.load_rubric(204 problem_rubric,205 self.rubric,206 logging_level=self.logging_level)207 self._grade_compile_result()208 self.test_count = self._grade_test_result()209 self._grade_valgrind_result()210 def _grade_compile_result(self):211 compile_max = rb.get_warning_deduction_limit(self.rubric)212 compile_total = compile_max213 warnings = res.read_compile_result(self.compile_file, logging_level=self.logging_level)214 for warning in warnings:215 deduction = rb.get_warning_deduction(self.rubric, warning)216 if deduction <= compile_total or compile_max < 0:217 self._add_deduction(218 deduction,219 'Warning {}'.format(220 res.CWARNING_OUTPUT.get(warning, ''))221 )222 compile_total -= deduction223 else:224 self._add_deduction(225 compile_total,226 'Warning {} (should be -{} but warning deduction capped at -{})'.format(227 res.CWARNING_OUTPUT.get(warning, ''),228 deduction,229 compile_max),230 )231 compile_total = 0232 def _grade_test_result(self):233 test_names = res.read_formatted_result(234 self.formatted_file,235 logging_level=self.logging_level)236 for test_name, result, vresult in test_names:237 if result != res.ERESULT_PASS:238 self._add_deduction(239 rb.get_test_deduction(self.rubric, test_name),240 'Test {} {}'.format(test_name, res.ERESULT_OUTPUT.get(result, ''))241 )242 if vresult == res.VRESULT_FAIL:243 self._add_valgrind_deduction()244 return len(test_names)245 def _add_valgrind_deduction(self):246 self.valgrind_error += 1247 def _grade_valgrind_result(self):248 if self.valgrind_error > 0:249 valgrind_max = rb.get_valgrind_deduction_limit(self.rubric)250 deduction = rb.get_valgrind_deduction(self.rubric, self.valgrind_error)251 if deduction <= valgrind_max or valgrind_max < 0:252 self._add_deduction(253 rb.get_valgrind_deduction(self.rubric, self.valgrind_error),254 'Valgrind error')255 else:256 self._add_deduction(257 valgrind_max,258 'Valgrind error (should be -{} but valgrind deduction capped at -{})'.format(259 deduction,260 valgrind_max),261 )262 def _results_generated(self):263 return (self.compile_file is not None264 and self.result_file is not None265 and self.valgrind_file is not None266 and self.formatted_file is not None)267 def _add_deduction(self, points, comment):268 self.test_deductions.append((points, comment))269 def get_test_points(self):270 if self.test_points is None:271 total = 0272 for deduct in self.test_deductions:273 total += deduct[0]274 self.test_points = max(0, self.test_max - total)275 return self.test_points276 def write_score_breakdown(self, ostream):277 self.test_points = self.get_test_points()278 problem_title = 'Problem {num} ({name}) ({pt}/{total}):'.format(279 num=self.number,280 name=self.name,281 pt=round(self.test_points, 2),282 total=round(self.test_max, 2))283 md.write_header(ostream, problem_title, GRHLEVEL_PROBLEM)284 for deduct in self.test_deductions:285 deduction_text = '(-{pt} points) {comment}'.format(286 pt=round(deduct[0], 2),287 comment=deduct[1])288 md.write_list(ostream, deduction_text)289 if self.test_deductions:290 md.end_list(ostream)291####################################################################################################292# Purpose: Generate grade report for homework.293# Arguments:294# homework: Homework instance.295# grader: Grader information.296# report_dir: Directory where report is generated.297# overwrite: If grade report already exist, should it be overwritten.298####################################################################################################299def generate_grade_report(homework,300 grader,301 report_dir,302 overwrite=False,303 logging_level=log.LOGLEVEL_ERROR304 ):305 report_filename = report_dir + 'GR{}_hw-username.md'.format(str(homework.number))306 if sysio.exist_file(report_filename):307 if not overwrite:308 log.log_error('Report {} already exists'.format(report_filename), logging_level)309 return310 else:311 log.log_warning('Overwriting existing report {}'.format(report_filename), logging_level)312 report_file = open(report_filename, mode='w')313 title = '## HW {num:02d} Test Case Grade Report'.format(num=homework.number)314 md.write_header(report_file, title, GRHLEVEL_TITLE)315 homework.write_score_breakdown(report_file)316 grader_text = 'HW Graded by: {}'.format(grader.get_grader_info())317 md.write_header(report_file, grader_text, GRHLEVEL_OTHER)318 md.write_paragraph(report_file, GRFOOTER)...

Full Screen

Full Screen

parser.py

Source:parser.py Github

copy

Full Screen

1"""2:synopsis: Specialized :class:`ptp.libptp.parser.AbstractParser` classes for the tool Skipfish.3.. moduleauthor:: Tao Sauvage4"""5import re6import os7import js2py8from ptp.libptp import constants9from ptp.libptp.exceptions import NotSupportedVersionError, ReportNotFoundError10from ptp.libptp.parser import AbstractParser, FileParser11class SkipfishJSParser(AbstractParser):12 """Skipfish JS specialized parser."""13 __tool__ = 'skipfish'14 __format__ = 'js'15 __version__ = r'2\.10b'16 HIGH = 417 MEDIUM = 318 LOW = 219 WARNINGS = 120 INFO = 021 RANKING_SCALE = {22 HIGH: constants.HIGH,23 MEDIUM: constants.MEDIUM,24 LOW: constants.LOW,25 WARNINGS: constants.INFO,26 INFO: constants.INFO}27 _reportfile = 'samples.js'28 _metadatafile = 'summary.js'29 def __init__(self, pathname, light=False):30 """Initialize Skipfish JS parser.31 :param str pathname: Path to the report directory.32 :param bool light: `True` to only parse the ranking of the findings from the report.33 """34 self.search_directory = ''35 self.light = light36 metadatafile = self._recursive_find(pathname, self._metadatafile)37 if metadatafile:38 metadatafile = metadatafile[0] # Only keep first instance (in case multiple match)39 reportfile = self._recursive_find(pathname, self._reportfile)40 if reportfile:41 self.search_directory = pathname42 reportfile = reportfile[0] # Only keep first instance (in case multiple match)43 self.metadata_stream, self.report_stream = self.handle_file(metadatafile, reportfile)44 self.re_var_pattern = re.compile(r"var\s+(?P<variables>[a-zA-Z_0-9]+)\s+(?==)")45 self.re_metadata = re.compile(r"var\s+([a-zA-Z_0-9]+)\s+=\s+'{0,1}([^;']*)'{0,1};")46 self._re_reponse_status_code = re.compile(r"^HTTP.*?\/\d\.\d (\d+) .")47 @classmethod48 def handle_file(cls, metadatafile, reportfile):49 """Process the two report files of the Skipfish report.50 :param str metadatafile: Path to the metadata file.51 :param str reportfile: Path to the report file.52 :raises TypeError: if the files have not the right extension.53 :raises OSError: if an error occurs when reading the files.54 :raises IOError: if an error occurs when reading the files.55 :return: Both metadata and report files' contents.56 :rtype: :class:`tuple`57 """58 if not metadatafile.endswith(cls.__format__) or not reportfile.endswith(cls.__format__):59 raise TypeError("This parser only supports '%s' files" % cls.__format__)60 pathname, filename = os.path.split(metadatafile)61 metadata_stream = FileParser.handle_file(pathname=pathname, filename=filename)62 pathname, filename = os.path.split(reportfile)63 report_stream = FileParser.handle_file(pathname=pathname, filename=filename)64 return (metadata_stream, report_stream)65 @classmethod66 def is_mine(cls, pathname, light=False):67 """Check if it can handle the report file.68 :param str pathname: Path to the report directory.69 :param bool light: `True` to only parse the ranking of the findings from the report.70 :raises IOError: when the report file cannot be found.71 :raises OSError: when the report file cannot be found.72 :return: `True` if it supports the report, `False` otherwise.73 :rtype: :class:`bool`74 """75 metadatafile = cls._recursive_find(pathname, cls._metadatafile)76 if not metadatafile:77 return False78 metadatafile = metadatafile[0]79 reportfile = cls._recursive_find(pathname, cls._reportfile)80 if not reportfile:81 return False82 reportfile = reportfile[0]83 try:84 metadata_stream, report_stream = cls.handle_file(metadatafile, reportfile)85 except TypeError:86 return False87 return True88 def parse_metadata(self):89 """Retrieve the metadata of the report.90 :raises: :class:`NotSupportedVersionError` -- if it does not support this version of the report.91 :return: Dictionary containing the metadatas.92 :rtype: :class:`dict`93 .. note::94 In skipfish the metadata are saved into the summary.js file as follow:95 .. code-block:: js96 var sf_version = version<string>;97 var scan_date = date<'Ddd Mmm d hh:mm:ss yyyy'>;98 var scan_seed = scan seed<integer>99 var scan_ms = elapsed time in ms<integer>;100 """101 re_result = self.re_metadata.findall(self.metadata_stream)102 metadata = dict({el[0]: el[1] for el in re_result})103 # Check if the version if the good one.104 if self.check_version(metadata, key='sf_version'):105 return metadata106 raise NotSupportedVersionError('PTP does NOT support this version of Skipfish.')107 def _parse_report_full(self, dir_list):108 """Parse HTTP requests from directories listed in the samples.js file.109 From all the directories, it reads request.dat and response.dat file and return a list of dict resquests and110 responses.111 """112 data = []113 for dirs in dir_list:114 try:115 with open(os.path.join(dirs['dir'], 'request.dat'), 'r') as req_data:116 request = req_data.read()117 except IOError:118 request = "NOT_FOUND"119 try:120 with open(os.path.join(dirs['dir'], 'response.dat'), 'r') as res_data:121 response = res_data.read()122 response_status_code = self._re_reponse_status_code.findall(response)[0]123 response_header, response_body = response.split('\n\n', 1)124 except IOError:125 response_body = response_header = response_status_code = "NOT_FOUND"126 # Somehow follow naming conventions from http://docs.python-requests.org/en/master/127 data.append({128 'request': request,129 'status_code': response_status_code,130 'headers': response_header,131 'body': response_body132 })133 return data134 def parse_report(self):135 """Retrieve the results from the report.136 :raises: :class:`ReportNotFoundError` -- if the report file was not found.137 :return: List of dicts where each one represents a discovery.138 :rtype: :class:`list`139 .. note::140 Example of retrieved data after conversion (i.e. `raw_report`) using the module :mod:`ast`:141 .. code-block:: js142 [{ 'severity': 3, 'type': 40402, 'samples': [143 { 'url': 'http://demo.testfire.net/bank/login.aspx', 'extra': 'SQL syntax string', 'sid': '21010', 'dir': '_i2/0' },144 { 'url': 'http://demo.testfire.net/bank/login.aspx', 'extra': 'SQL syntax string', 'sid': '21010', 'dir': '_i2/1' },145 { 'url': 'http://demo.testfire.net/subscribe.aspx', 'extra': 'SQL syntax string', 'sid': '21010', 'dir': '_i2/2' } ]146 },]147 """148 REPORT_VAR_NAME = 'issue_samples'149 variables = self.re_var_pattern.findall(self.report_stream)150 split_data = self.report_stream.split(";")151 js_data = [data for data in split_data if data is not None]152 py_data = []153 format_data = {} # Final python dict after converting js to py154 dirs = [] # List of directories of all urls155 # Converting js to py to make it simple to process156 for data in js_data:157 temp_data = js2py.eval_js(data)158 if temp_data is not None:159 py_data.append(temp_data)160 # Mapping variable to its content161 for i in range(len(py_data)):162 format_data[variables[i]] = py_data[i]163 if REPORT_VAR_NAME not in variables:164 raise ReportNotFoundError('PTP did NOT find issue_samples variable. Is this the correct file?')165 # We now have a raw version of the Skipfish report as a list of dict.166 self.vulns = [167 {'ranking': self.RANKING_SCALE[vuln['severity']]}168 for vuln in format_data[REPORT_VAR_NAME]]169 if not self.light:170 for var in variables:171 for item in format_data[var]:172 for sample in item['samples']:173 dirs.append({'url': sample['url'], 'dir': os.path.join(self.search_directory, sample['dir'])})174 self.vulns.append({'ranking': constants.UNKNOWN, 'transactions': self._parse_report_full(dirs)})...

Full Screen

Full Screen

report.py

Source:report.py Github

copy

Full Screen

1"""Fourth stage: Report"""2# Standard library imports3import io4from datetime import datetime5# Third party imports6import panel as pn7import param8import pyplugs9# Geo:N:G imports10from app import config11from app.assets import panes12from app.assets import state13from geong_common import reports14from geong_common.log import logger15# Find name of app and stage16*_, PACKAGE, APP, STAGE = __name__.split(".")17CFG = config.app[PACKAGE][APP][STAGE]18class Model(param.Parameterized):19 """Data defining this stage"""20 # Values set by previous stage21 # Parameters for the current stage22 geox_id = param.String(label="GeoX ID")23 scenario_names = param.List()24 def __init__(self, scenario_names, *args, **kwargs):25 """Initialize values of stage"""26 super().__init__()27 self._report_cfg = config.app.report[CFG.report]28 self._state = state.get_user_state().setdefault(APP, {})29 self.scenario_names = scenario_names30 self.scenario_selector = pn.widgets.CrossSelector(31 name="Choose scenarios",32 value=self.scenario_names,33 options=self.scenario_names,34 definition_order=False,35 )36 self.scenarios = self._state["scenarios"].copy()37 @property38 def data(self):39 """Data made available for the final report"""40 return {41 "date": datetime.now(),42 "geox_id": self.geox_id,43 "scenario_list": "\n".join(self.scenario_selector.value),44 "scenarios": [45 dict(self.scenarios[s], scenario_name=s)46 for s in self.scenario_selector.value47 ],48 }49 @param.depends("geox_id")50 def download_report_button(self):51 """Button for downloading report"""52 return pn.widgets.FileDownload(53 name="Download report",54 button_type="success",55 callback=self.download_report,56 filename=self._report_cfg.replace("file_name", **self.data),57 )58 def download_report(self, *event):59 """Generate and download final report"""60 report_stream = io.BytesIO()61 reports.generate(62 self._report_cfg.generator,63 template_url=self._report_cfg.template,64 cfg=self._report_cfg,65 data=self.data,66 output_path=report_stream,67 )68 try:69 session_id = pn.state.curdoc.session_context.id70 logger.insights(71 f"New report with GeoXID: {self.geox_id}, SessionID: {session_id}"72 )73 except AttributeError as e:74 logger.insights(f"New report with GeoXID: {self.geox_id}")75 logger.error(f"SessionID not available: {e}")76 # Clear scenarios from state77 self._state["scenarios"].clear()78 # Reset report byte stream to start before passing it back79 report_stream.seek(0)80 return report_stream81class View:82 """Define the look and feel of the stage"""83 def panel(self):84 return pn.Column(85 panes.headline(CFG.label),86 self.scenario_selector,87 pn.Row(88 pn.widgets.TextInput.from_param(self.param.geox_id),89 self.download_report_button,90 ),91 sizing_mode="stretch_width",92 )93@pyplugs.register94class StageReport(Model, View):...

Full Screen

Full Screen

helpers.py

Source:helpers.py Github

copy

Full Screen

1import configparser2import zeep3import logging4#import pandas as pd5#from pandas_gbq import gbq6import io7from lxml import etree8from zeep import Plugin9# Use this plugin as a kwarg for the zeep class to print SOAP messages10class LoggingPlugin(Plugin):11 def ingress(self, envelope, http_headers, operation):12 print(etree.tostring(envelope, pretty_print=True))13 return envelope, http_headers14 def egress(self, envelope, http_headers, operation, binding_options):15 print(etree.tostring(envelope, pretty_print=True))16 return envelope, http_headers17def backoff_hdlr(details):18 """ Prints backoff debug messages"""19 print("Backing off {wait:0.1f} seconds after {tries} tries "20 "calling function {target}".format(**details))21def backoff_hdlr_with_args(details):22 """ USE FOR DEBUGGING ONLY - Prints out all details about backoff events,23 including ultipro client object with credentials, to STDOUT24 """25 print("Backing off {wait:0.1f} seconds after {tries} tries "26 "calling function {target} with args {args} and kwargs "27 "{kwargs}".format(**details))28def read_conf(conf):29 parser = configparser.ConfigParser()30 parser.read(conf)31 rv = {}32 for section in parser.sections():33 for key, value in parser.items(section):34 rv[f"{section}.{key}"] = value35 return rv36def serialize(response):37 return zeep.helpers.serialize_object(response)38def write_file(report_stream, path):39 """Writes a stream to a file"""40 f = open(path, "w")41 f.write(report_stream)42 f.close()43#def csv_stream_to_dataframe(report_stream, delimiter):44# """Reads a streaming csv into a pandas DataFrame."""45# report = io.StringIO(report_stream)46# df = pd.read_csv(report, sep=delimiter, encoding='utf-8', index_col=False)47# return df48#49#def dataframe_to_bigquery(df):50# """Writes a DataFrame to Big Query. Column names will be downcased and51# all non-alphanumeric characters converted to underscores."""52# df.columns = df.columns.str.strip().str.lower().str.replace(r'[^\w]', '_')53 # gbq.to_gbq(54 # dataframe=df,55 # destination_table='lake1.pond1',56 # project_id='pubsub-bq-pipe-1',57 # chunksize=10000,58 # verbose=True,59 # reauth=False,60 # if_exists='append',61 # private_key="./pubsub-bq-pipe-1-a865bbaa5f48.json",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful