Best Python code snippet using slash
grading_tools.py
Source:grading_tools.py  
1import io2import cs_grading.executable_tools as exe3import cs_grading.io_tools as sysio4import cs_grading.logging_tools as log5import cs_grading.markdown_tools as md6import cs_grading.result_tools as res7import cs_grading.rubric_tools as rb8####################################################################################################9# Purpose: Defining several constants that can be used for shorthand in many of the methods below.10####################################################################################################11GRHLEVEL_TITLE = 212GRHLEVEL_PROBLEM = 313GRHLEVEL_OTHER = 414GRFOOTER = 'Please close this issue once you have viewed your grade report, '15GRFOOTER += 'or let the grader know if you need further details about this grade report '16GRFOOTER += '(see instructions below).\n\n'17GRFOOTER += '### Disputes\n\n'18GRFOOTER += 'If you wish to discuss any matters regarding your score on this assignment, '19GRFOOTER += 'please write it as a comment on this issue and assign your grader. '20GRFOOTER += 'Any and all discussions pertaining to your homework score **must** be done via '21GRFOOTER += 'comments on this issue.  Do not use email to discuss grading matters as they won\'t '22GRFOOTER += 'be considered part of the regrade process. All discussions will be done privately '23GRFOOTER += 'using this issue.\n\n'24GRFOOTER += 'You have 7 days from the time this issue was posted to *raise* a dispute about any '25GRFOOTER += 'grading issues. Allow up to 48 hours for your grader to respond. If your grader '26GRFOOTER += 'cannot resolve the issue within 7 days, you must elevate the issue to one of the TAs '27GRFOOTER += '(Michail, Gozde, or Yeji) by assigning them to this issue or attending their office '28GRFOOTER += 'hours. If after an additional 7 days you are still unhappy with the grading result, '29GRFOOTER += 'you must elevate the issue to a professor within 3 days. The professors will have the '30GRFOOTER += 'final input.\n\n'31GRFOOTER += 'This means you have a maximum of 7 + 7 + 3 days to resolve all grading disputes, '32GRFOOTER += 'after which point all grades are final.\n\n'33GRFOOTER += 'You are expected to follow the _Grading Disputes_ policies outlined on the '34GRFOOTER += '[assignments](http://bytes.usc.edu/cs104/assignments) page.'35####################################################################################################36# Purpose: Grader information.37# Arguments:38#     name:    The grader's name.39#     github:  The grader's GitHub id.40####################################################################################################41class Grader:42    def __init__(self, grader_info=None, logging_level=log.LOGLEVEL_ERROR):43        self.name = 'Grader-name'44        self.github = 'grader-github-ID'45        if grader_info is not None:46            self.load_grader_info(grader_info, logging_level)47    def load_grader_info(self, grader_info, logging_level=log.LOGLEVEL_ERROR):48        if sysio.exist_file(grader_info):49            grader_file = open(grader_info, mode='r', errors='ignore')50            lines = grader_file.readlines()51            grader_file.close()52            if len(lines) < 2:53                log.log_error(54                    'Grader info needs 2 lines but found {}'.format(str(len(lines))),55                    logging_level56                    )57            else:58                if len(lines) > 2:59                    log.log_warning(60                        'Grader info needs 2 lines but found {}'.format(str(len(lines))),61                        logging_level62                    )63                self.name = lines[0].strip()64                self.github = lines[1].strip()65        else:66            log.log_error('Grader info file {} not found'.format(grader_info), logging_level)67    def get_grader_info(self):68        return '{name} (@{github})'.format(name=self.name, github=self.github)69####################################################################################################70# Purpose: Breakdown of homework grading.71# Arguments:72#     number:  The homework number.73####################################################################################################74class Homework:75    def __init__(self,76                 number,77                 result_dir,78                 remove_output,79                 detailed_results=True,80                 compile_flags=exe.CPPFLAGS,81                 logging_level=log.LOGLEVEL_ERROR82                ):83        self.number = number84        self.result_dir = result_dir85        self.remove_output = remove_output86        self.detailed_results = detailed_results87        self.compile_flags = compile_flags88        self.logging_level = logging_level89        self.total_points = 090        self.total_max = 091        self.total_other = 092        self.problems = []93        self.other_deductions = []94        self.problem_subtotal = []95        sysio.mkdir(self.result_dir, logging_level=self.logging_level)96    def add_problem(self, problem):97        self.problems.append(problem)98    def _add_other_deduction(self, points, comment):99        self.other_deductions.append((points, comment))100    def grade_other_deduction(self, other_rubric):101        other_deduct = rb.load_other_rubric(other_rubric, self.logging_level)102        for deduct in other_deduct:103            self._add_other_deduction(104                deduct[rb.RUBRIC_OTHER_POINT],105                deduct[rb.RUBRIC_OTHER_COMMENT])106    def write_score_breakdown(self, report_file):107        report_stream = io.StringIO()108        for problem in self.problems:109            problem.write_score_breakdown(report_stream)110            points = problem.get_test_points()111            self.total_points += points112            self.total_max += problem.test_max113            self.problem_subtotal.append(str(round(points, 2)))114        md.write_header(report_stream, 'Other Deductions & Penalties:', GRHLEVEL_PROBLEM)115        for deduct in self.other_deductions:116            self.total_other += deduct[0]117            self.total_points -= deduct[0]118            deduction_text = '(-{pt} points) {comment}'.format(119                pt=round(deduct[0], 2),120                comment=deduct[1])121            md.write_list(report_stream, deduction_text)122        if self.other_deductions:123            md.end_list(report_stream)124        self.total_points = max(self.total_points, 0)125        self._write_score_calculation(report_stream)126        summary_text = 'I have completely finished grading your'127        summary_text += ' HW {num:02d} test cases and you received'.format(num=self.number)128        summary_text += ' **{total}/{max} points**.'.format(129            total=round(self.total_points, 2),130            max=round(self.total_max, 2))131        breakdown_text = 'This is the score breakdown:'132        md.write_paragraph(report_file, summary_text)133        md.write_paragraph(report_file, breakdown_text)134        md.write_paragraph(report_file, report_stream.getvalue().strip())135        report_stream.close()136    def _write_score_calculation(self, report_file):137        md.write_header(report_file, 'Total Score:', GRHLEVEL_PROBLEM)138        calc = '({test}) - ({other}) = **{total:}/{max} points**'.format(139            test=' + '.join(self.problem_subtotal),140            other=self.total_other,141            total=round(self.total_points, 2),142            max=round(self.total_max, 2))143        md.write_list(report_file, calc)144        md.end_list(report_file)145####################################################################################################146# Purpose: Breakdown of homework grading.147# Arguments:148#     number:    The problem number.149#     nams:      The problem name.150#     test_max:  The max number of points.151####################################################################################################152class Problem:153    def __init__(self, homework, number, name, test_max):154        homework.add_problem(self)155        self.result_dir = homework.result_dir156        self.remove_output = homework.remove_output157        self.detailed_results = homework.detailed_results158        self.compile_flags = homework.compile_flags159        self.logging_level = homework.logging_level160        self.number = number161        self.name = name162        self.test_max = test_max163        self.test_count = 0164        self.test_deductions = []165        self.test_points = None166        self.valgrind_error = 0167        self.compile_file = self.result_dir + '/' + self.name + '_compile.txt'168        self.result_file = self.result_dir + '/' + self.name + '_result.txt'169        self.valgrind_file = self.result_dir + '/' + self.name + '_valgrind.txt'170        self.formatted_file = self.result_dir + '/' + self.name + '_formatted.txt'171        sysio.clean_file(self.compile_file)172        sysio.clean_file(self.result_file)173        sysio.clean_file(self.valgrind_file)174        sysio.clean_file(self.formatted_file)175        self.use_valgrind = None176        self.timeout = None177        self.rubric = dict()178    def generate_results(self,179                         grading,180                         use_valgrind,181                         timeout=None,182                        ):183        self.use_valgrind = use_valgrind184        self.timeout = timeout185        return grading(self)186    def open_result(self, text_editor=sysio.TEXT_EDITOR):187        if sysio.exist_file(self.result_file):188            sysio.open_file(self.result_file, text_editor)189        if sysio.exist_file(self.valgrind_file):190            sysio.open_file(self.valgrind_file, text_editor)191    def grade_problem(self, general_rubric=None, problem_rubric=None):192        if not self._results_generated():193            log.log_error(194                'Grading problem {} before results are generated'.format(self.name),195                self.logging_level)196        else:197            if general_rubric is not None:198                self.rubric = rb.load_rubric(199                    general_rubric,200                    self.rubric,201                    logging_level=self.logging_level)202            if problem_rubric is not None:203                self.rubric = rb.load_rubric(204                    problem_rubric,205                    self.rubric,206                    logging_level=self.logging_level)207            self._grade_compile_result()208            self.test_count = self._grade_test_result()209            self._grade_valgrind_result()210    def _grade_compile_result(self):211        compile_max = rb.get_warning_deduction_limit(self.rubric)212        compile_total = compile_max213        warnings = res.read_compile_result(self.compile_file, logging_level=self.logging_level)214        for warning in warnings:215            deduction = rb.get_warning_deduction(self.rubric, warning)216            if deduction <= compile_total or compile_max < 0:217                self._add_deduction(218                    deduction,219                    'Warning {}'.format(220                        res.CWARNING_OUTPUT.get(warning, ''))221                    )222                compile_total -= deduction223            else:224                self._add_deduction(225                    compile_total,226                    'Warning {} (should be -{} but warning deduction capped at -{})'.format(227                        res.CWARNING_OUTPUT.get(warning, ''),228                        deduction,229                        compile_max),230                    )231                compile_total = 0232    def _grade_test_result(self):233        test_names = res.read_formatted_result(234            self.formatted_file,235            logging_level=self.logging_level)236        for test_name, result, vresult in test_names:237            if result != res.ERESULT_PASS:238                self._add_deduction(239                    rb.get_test_deduction(self.rubric, test_name),240                    'Test {} {}'.format(test_name, res.ERESULT_OUTPUT.get(result, ''))241                    )242            if vresult == res.VRESULT_FAIL:243                self._add_valgrind_deduction()244        return len(test_names)245    def _add_valgrind_deduction(self):246        self.valgrind_error += 1247    def _grade_valgrind_result(self):248        if self.valgrind_error > 0:249            valgrind_max = rb.get_valgrind_deduction_limit(self.rubric)250            deduction = rb.get_valgrind_deduction(self.rubric, self.valgrind_error)251            if deduction <= valgrind_max or valgrind_max < 0:252                self._add_deduction(253                    rb.get_valgrind_deduction(self.rubric, self.valgrind_error),254                    'Valgrind error')255            else:256                self._add_deduction(257                    valgrind_max,258                    'Valgrind error (should be -{} but valgrind deduction capped at -{})'.format(259                        deduction,260                        valgrind_max),261                    )262    def _results_generated(self):263        return (self.compile_file is not None264                and self.result_file is not None265                and self.valgrind_file is not None266                and self.formatted_file is not None)267    def _add_deduction(self, points, comment):268        self.test_deductions.append((points, comment))269    def get_test_points(self):270        if self.test_points is None:271            total = 0272            for deduct in self.test_deductions:273                total += deduct[0]274            self.test_points = max(0, self.test_max - total)275        return self.test_points276    def write_score_breakdown(self, ostream):277        self.test_points = self.get_test_points()278        problem_title = 'Problem {num} ({name}) ({pt}/{total}):'.format(279            num=self.number,280            name=self.name,281            pt=round(self.test_points, 2),282            total=round(self.test_max, 2))283        md.write_header(ostream, problem_title, GRHLEVEL_PROBLEM)284        for deduct in self.test_deductions:285            deduction_text = '(-{pt} points) {comment}'.format(286                pt=round(deduct[0], 2),287                comment=deduct[1])288            md.write_list(ostream, deduction_text)289        if self.test_deductions:290            md.end_list(ostream)291####################################################################################################292# Purpose: Generate grade report for homework.293# Arguments:294#     homework:    Homework instance.295#     grader:      Grader information.296#     report_dir:  Directory where report is generated.297#     overwrite:   If grade report already exist, should it be overwritten.298####################################################################################################299def generate_grade_report(homework,300                          grader,301                          report_dir,302                          overwrite=False,303                          logging_level=log.LOGLEVEL_ERROR304                         ):305    report_filename = report_dir + 'GR{}_hw-username.md'.format(str(homework.number))306    if sysio.exist_file(report_filename):307        if not overwrite:308            log.log_error('Report {} already exists'.format(report_filename), logging_level)309            return310        else:311            log.log_warning('Overwriting existing report {}'.format(report_filename), logging_level)312    report_file = open(report_filename, mode='w')313    title = '## HW {num:02d} Test Case Grade Report'.format(num=homework.number)314    md.write_header(report_file, title, GRHLEVEL_TITLE)315    homework.write_score_breakdown(report_file)316    grader_text = 'HW Graded by: {}'.format(grader.get_grader_info())317    md.write_header(report_file, grader_text, GRHLEVEL_OTHER)318    md.write_paragraph(report_file, GRFOOTER)...parser.py
Source:parser.py  
1"""2:synopsis: Specialized :class:`ptp.libptp.parser.AbstractParser` classes for the tool Skipfish.3.. moduleauthor:: Tao Sauvage4"""5import re6import os7import js2py8from ptp.libptp import constants9from ptp.libptp.exceptions import NotSupportedVersionError, ReportNotFoundError10from ptp.libptp.parser import AbstractParser, FileParser11class SkipfishJSParser(AbstractParser):12    """Skipfish JS specialized parser."""13    __tool__ = 'skipfish'14    __format__ = 'js'15    __version__ = r'2\.10b'16    HIGH = 417    MEDIUM = 318    LOW = 219    WARNINGS = 120    INFO = 021    RANKING_SCALE = {22        HIGH: constants.HIGH,23        MEDIUM: constants.MEDIUM,24        LOW: constants.LOW,25        WARNINGS: constants.INFO,26        INFO: constants.INFO}27    _reportfile = 'samples.js'28    _metadatafile = 'summary.js'29    def __init__(self, pathname, light=False):30        """Initialize Skipfish JS parser.31        :param str pathname: Path to the report directory.32        :param bool light: `True` to only parse the ranking of the findings from the report.33        """34        self.search_directory = ''35        self.light = light36        metadatafile = self._recursive_find(pathname, self._metadatafile)37        if metadatafile:38            metadatafile = metadatafile[0]  # Only keep first instance (in case multiple match)39        reportfile = self._recursive_find(pathname, self._reportfile)40        if reportfile:41            self.search_directory = pathname42            reportfile = reportfile[0]  # Only keep first instance (in case multiple match)43        self.metadata_stream, self.report_stream = self.handle_file(metadatafile, reportfile)44        self.re_var_pattern = re.compile(r"var\s+(?P<variables>[a-zA-Z_0-9]+)\s+(?==)")45        self.re_metadata = re.compile(r"var\s+([a-zA-Z_0-9]+)\s+=\s+'{0,1}([^;']*)'{0,1};")46        self._re_reponse_status_code = re.compile(r"^HTTP.*?\/\d\.\d (\d+) .")47    @classmethod48    def handle_file(cls, metadatafile, reportfile):49        """Process the two report files of the Skipfish report.50        :param str metadatafile: Path to the metadata file.51        :param str reportfile: Path to the report file.52        :raises TypeError: if the files have not the right extension.53        :raises OSError: if an error occurs when reading the files.54        :raises IOError: if an error occurs when reading the files.55        :return: Both metadata and report files' contents.56        :rtype: :class:`tuple`57        """58        if not metadatafile.endswith(cls.__format__) or not reportfile.endswith(cls.__format__):59            raise TypeError("This parser only supports '%s' files" % cls.__format__)60        pathname, filename = os.path.split(metadatafile)61        metadata_stream = FileParser.handle_file(pathname=pathname, filename=filename)62        pathname, filename = os.path.split(reportfile)63        report_stream = FileParser.handle_file(pathname=pathname, filename=filename)64        return (metadata_stream, report_stream)65    @classmethod66    def is_mine(cls, pathname, light=False):67        """Check if it can handle the report file.68        :param str pathname: Path to the report directory.69        :param bool light: `True` to only parse the ranking of the findings from the report.70        :raises IOError: when the report file cannot be found.71        :raises OSError: when the report file cannot be found.72        :return: `True` if it supports the report, `False` otherwise.73        :rtype: :class:`bool`74        """75        metadatafile = cls._recursive_find(pathname, cls._metadatafile)76        if not metadatafile:77            return False78        metadatafile = metadatafile[0]79        reportfile = cls._recursive_find(pathname, cls._reportfile)80        if not reportfile:81            return False82        reportfile = reportfile[0]83        try:84            metadata_stream, report_stream = cls.handle_file(metadatafile, reportfile)85        except TypeError:86            return False87        return True88    def parse_metadata(self):89        """Retrieve the metadata of the report.90        :raises: :class:`NotSupportedVersionError` -- if it does not support this version of the report.91        :return: Dictionary containing the metadatas.92        :rtype: :class:`dict`93        .. note::94            In skipfish the metadata are saved into the summary.js file as follow:95            .. code-block:: js96                var sf_version = version<string>;97                var scan_date  = date<'Ddd Mmm d hh:mm:ss yyyy'>;98                var scan_seed  = scan seed<integer>99                var scan_ms    = elapsed time in ms<integer>;100        """101        re_result = self.re_metadata.findall(self.metadata_stream)102        metadata = dict({el[0]: el[1] for el in re_result})103        # Check if the version if the good one.104        if self.check_version(metadata, key='sf_version'):105            return metadata106        raise NotSupportedVersionError('PTP does NOT support this version of Skipfish.')107    def _parse_report_full(self, dir_list):108        """Parse HTTP requests from directories listed in the samples.js file.109        From all the directories, it reads request.dat and response.dat file and return a list of dict resquests and110        responses.111        """112        data = []113        for dirs in dir_list:114            try:115                with open(os.path.join(dirs['dir'], 'request.dat'), 'r') as req_data:116                    request = req_data.read()117            except IOError:118                request = "NOT_FOUND"119            try:120                with open(os.path.join(dirs['dir'], 'response.dat'), 'r') as res_data:121                    response = res_data.read()122                    response_status_code = self._re_reponse_status_code.findall(response)[0]123                    response_header, response_body = response.split('\n\n', 1)124            except IOError:125                response_body = response_header = response_status_code = "NOT_FOUND"126            # Somehow follow naming conventions from http://docs.python-requests.org/en/master/127            data.append({128                'request': request,129                'status_code': response_status_code,130                'headers': response_header,131                'body': response_body132            })133        return data134    def parse_report(self):135        """Retrieve the results from the report.136        :raises: :class:`ReportNotFoundError` -- if the report file was not found.137        :return: List of dicts where each one represents a discovery.138        :rtype: :class:`list`139        .. note::140            Example of retrieved data after conversion (i.e. `raw_report`) using the module :mod:`ast`:141            .. code-block:: js142                [{ 'severity': 3, 'type': 40402, 'samples': [143                    { 'url': 'http://demo.testfire.net/bank/login.aspx', 'extra': 'SQL syntax string', 'sid': '21010', 'dir': '_i2/0' },144                    { 'url': 'http://demo.testfire.net/bank/login.aspx', 'extra': 'SQL syntax string', 'sid': '21010', 'dir': '_i2/1' },145                    { 'url': 'http://demo.testfire.net/subscribe.aspx', 'extra': 'SQL syntax string', 'sid': '21010', 'dir': '_i2/2' } ]146                },]147        """148        REPORT_VAR_NAME = 'issue_samples'149        variables = self.re_var_pattern.findall(self.report_stream)150        split_data = self.report_stream.split(";")151        js_data = [data for data in split_data if data is not None]152        py_data = []153        format_data = {}  # Final python dict after converting js to py154        dirs = []  # List of directories of all urls155        # Converting js to py to make it simple to process156        for data in js_data:157            temp_data = js2py.eval_js(data)158            if temp_data is not None:159                py_data.append(temp_data)160        # Mapping variable to its content161        for i in range(len(py_data)):162            format_data[variables[i]] = py_data[i]163        if REPORT_VAR_NAME not in variables:164            raise ReportNotFoundError('PTP did NOT find issue_samples variable. Is this the correct file?')165        # We now have a raw version of the Skipfish report as a list of dict.166        self.vulns = [167            {'ranking': self.RANKING_SCALE[vuln['severity']]}168            for vuln in format_data[REPORT_VAR_NAME]]169        if not self.light:170            for var in variables:171                for item in format_data[var]:172                    for sample in item['samples']:173                        dirs.append({'url': sample['url'], 'dir': os.path.join(self.search_directory, sample['dir'])})174            self.vulns.append({'ranking': constants.UNKNOWN, 'transactions': self._parse_report_full(dirs)})...report.py
Source:report.py  
1"""Fourth stage: Report"""2# Standard library imports3import io4from datetime import datetime5# Third party imports6import panel as pn7import param8import pyplugs9# Geo:N:G imports10from app import config11from app.assets import panes12from app.assets import state13from geong_common import reports14from geong_common.log import logger15# Find name of app and stage16*_, PACKAGE, APP, STAGE = __name__.split(".")17CFG = config.app[PACKAGE][APP][STAGE]18class Model(param.Parameterized):19    """Data defining this stage"""20    # Values set by previous stage21    # Parameters for the current stage22    geox_id = param.String(label="GeoX ID")23    scenario_names = param.List()24    def __init__(self, scenario_names, *args, **kwargs):25        """Initialize values of stage"""26        super().__init__()27        self._report_cfg = config.app.report[CFG.report]28        self._state = state.get_user_state().setdefault(APP, {})29        self.scenario_names = scenario_names30        self.scenario_selector = pn.widgets.CrossSelector(31            name="Choose scenarios",32            value=self.scenario_names,33            options=self.scenario_names,34            definition_order=False,35        )36        self.scenarios = self._state["scenarios"].copy()37    @property38    def data(self):39        """Data made available for the final report"""40        return {41            "date": datetime.now(),42            "geox_id": self.geox_id,43            "scenario_list": "\n".join(self.scenario_selector.value),44            "scenarios": [45                dict(self.scenarios[s], scenario_name=s)46                for s in self.scenario_selector.value47            ],48        }49    @param.depends("geox_id")50    def download_report_button(self):51        """Button for downloading report"""52        return pn.widgets.FileDownload(53            name="Download report",54            button_type="success",55            callback=self.download_report,56            filename=self._report_cfg.replace("file_name", **self.data),57        )58    def download_report(self, *event):59        """Generate and download final report"""60        report_stream = io.BytesIO()61        reports.generate(62            self._report_cfg.generator,63            template_url=self._report_cfg.template,64            cfg=self._report_cfg,65            data=self.data,66            output_path=report_stream,67        )68        try:69            session_id = pn.state.curdoc.session_context.id70            logger.insights(71                f"New report with GeoXID: {self.geox_id}, SessionID: {session_id}"72            )73        except AttributeError as e:74            logger.insights(f"New report with GeoXID: {self.geox_id}")75            logger.error(f"SessionID not available: {e}")76        # Clear scenarios from state77        self._state["scenarios"].clear()78        # Reset report byte stream to start before passing it back79        report_stream.seek(0)80        return report_stream81class View:82    """Define the look and feel of the stage"""83    def panel(self):84        return pn.Column(85            panes.headline(CFG.label),86            self.scenario_selector,87            pn.Row(88                pn.widgets.TextInput.from_param(self.param.geox_id),89                self.download_report_button,90            ),91            sizing_mode="stretch_width",92        )93@pyplugs.register94class StageReport(Model, View):...helpers.py
Source:helpers.py  
1import configparser2import zeep3import logging4#import pandas as pd5#from pandas_gbq import gbq6import io7from lxml import etree8from zeep import Plugin9# Use this plugin as a kwarg for the zeep class to print SOAP messages10class LoggingPlugin(Plugin):11    def ingress(self, envelope, http_headers, operation):12        print(etree.tostring(envelope, pretty_print=True))13        return envelope, http_headers14    def egress(self, envelope, http_headers, operation, binding_options):15        print(etree.tostring(envelope, pretty_print=True))16        return envelope, http_headers17def backoff_hdlr(details):18    """ Prints backoff debug messages"""19    print("Backing off {wait:0.1f} seconds after {tries} tries "20          "calling function {target}".format(**details))21def backoff_hdlr_with_args(details):22    """ USE FOR DEBUGGING ONLY - Prints out all details about backoff events,23    including ultipro client object with credentials, to STDOUT24    """25    print("Backing off {wait:0.1f} seconds after {tries} tries "26          "calling function {target} with args {args} and kwargs "27          "{kwargs}".format(**details))28def read_conf(conf):29    parser = configparser.ConfigParser()30    parser.read(conf)31    rv = {}32    for section in parser.sections():33        for key, value in parser.items(section):34            rv[f"{section}.{key}"] = value35    return rv36def serialize(response):37    return zeep.helpers.serialize_object(response)38def write_file(report_stream, path):39    """Writes a stream to a file"""40    f = open(path, "w")41    f.write(report_stream)42    f.close()43#def csv_stream_to_dataframe(report_stream, delimiter):44#    """Reads a streaming csv into a pandas DataFrame."""45#    report = io.StringIO(report_stream)46#    df = pd.read_csv(report, sep=delimiter, encoding='utf-8', index_col=False)47#    return df48#49#def dataframe_to_bigquery(df):50#    """Writes a DataFrame to Big Query. Column names will be downcased and51#    all non-alphanumeric characters converted to underscores."""52#    df.columns = df.columns.str.strip().str.lower().str.replace(r'[^\w]', '_')53    # gbq.to_gbq(54    #   dataframe=df,55    #   destination_table='lake1.pond1',56    #   project_id='pubsub-bq-pipe-1',57    #   chunksize=10000,58    #   verbose=True,59    #   reauth=False,60    #   if_exists='append',61    #   private_key="./pubsub-bq-pipe-1-a865bbaa5f48.json",...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
