Best Python code snippet using slash
email.py
Source:email.py  
1import os2import json3import traceback4import platform5from datetime import datetime6import smtplib7from email.mime.base import MIMEBase8from email.mime.multipart import MIMEMultipart9from email.mime.image import MIMEImage10from email.mime.text import MIMEText11from email import encoders12DEFAULT_SETTINGS_FILE = os.path.join(13    os.path.dirname(os.path.realpath(__file__)), os.pardir, 'email_settings.json')14class EmailError(Exception):15    pass16class EmailInvalidSettingsError(EmailError):17    pass18class Email():19    """ Create and send emails """20    def __init__(self,21                 to=[],22                 cc=[],23                 bcc=[],24                 subject=None,25                 body=None,26                 body_type='plain',27                 files=[],28                 images=[],29                 images_inline=False,30                 smtp_server=None,31                 smtp_timeout=None,32                 settings_file=None,33                 smtp_password=None,34                 smtp_username=None35                 ):36        self.to = to37        self.cc = cc38        self.bcc = bcc39        self.subject = subject40        self.body = body41        self.body_type = body_type42        self.files = files43        self.images = images44        self.images_inline = images_inline45        self.smtp_password = None46        self.smtp_username = None47        self.smtp_server = None48        self.smtp_timeout = None49        if settings_file and not os.path.isfile(settings_file):50            self.error(f'Cannot open settings file {settings_file}')51        if not settings_file and os.path.isfile(DEFAULT_SETTINGS_FILE):52            settings_file = DEFAULT_SETTINGS_FILE53        if settings_file:54            self.load_settings_file(settings_file)55        # Priorities are: Arguments, file options56        if smtp_password:57            self.smtp_password = smtp_password58        if smtp_username:59            self.smtp_username = smtp_username60        # SMTP Username is also used as send sender email address61        # Priorities are: Arguments, file options, default options62        self.smtp_server = smtp_server or self.smtp_server or 'smtp.office365.com'63        self.smtp_timeout = smtp_timeout or self.smtp_timeout or 58764    def load_settings_file(self, file):65        self.log(f"Attempting to load email settings from file {file}")66        with open(file, 'r') as f:67            settings = json.load(f)68        # Required settings69        if 'smtp' not in settings:70            raise EmailInvalidSettingsError(f'No SMTP settings in file: {file}')71        if 'email' not in settings['smtp']:72            raise EmailInvalidSettingsError(f'No SMTP email in settings file: {file}')73        if 'password' not in settings['smtp']:74            raise EmailInvalidSettingsError(f'No SMTP password in settings file: {file}')75        self.smtp_username = settings['smtp']['email']76        self.smtp_password = settings['smtp']['password']77        # Optional settings78        if 'server' in settings['smtp']:79            self.smtp_server = settings['smtp']['server']80        if 'timeout' in settings['smtp']:81            self.smtp_timeout = settings['smtp']['timeout']82    # Property setters for data sanitation83    @property84    def cc(self):85        return self._cc86    @cc.setter87    def cc(self, cc):88        if cc is None:89            self._cc = None90        else:91            if isinstance(cc, str):92                cc = [cc]93            self._cc = list(map(str, cc))94    @property95    def bcc(self):96        return self._bcc97    @bcc.setter98    def bcc(self, bcc):99        if bcc is None:100            self._bcc = None101        else:102            if isinstance(bcc, str):103                bcc = [bcc]104            self._bcc = list(map(str, bcc))105    @property106    def to(self):107        return self._to108    @to.setter109    def to(self, to):110        if to is None:111            self._to = None112        else:113            if isinstance(to, str):114                to = [to]115            self._to = list(map(str, to))116    @property117    def body(self):118        return self._body119    @body.setter120    def body(self, body):121        if body is None:122            self._body = None123        else:124            if not isinstance(body, str):125                body = str(body)126            self._body = body127    @property128    def body_type(self):129        return self._body_type130    @body_type.setter131    def body_type(self, body_type):132        if body_type is None:133            self._body_type = None134        else:135            if not isinstance(body_type, str):136                body_type = str(body_type)137            self._body_type = body_type138    @property139    def subject(self):140        return self._subject141    @subject.setter142    def subject(self, subject):143        if subject is None:144            self._subject = None145        else:146            if not isinstance(subject, str):147                subject = str(subject)148            self._subject = subject149    @property150    def files(self):151        return self._files152    @files.setter153    def files(self, files):154        if files is None:155            self._files = None156        else:157            if isinstance(files, str):158                files = [files]159            self._files = list(map(str, files))160    @property161    def images(self):162        return self._images163    @images.setter164    def images(self, images):165        if images is None:166            self._images = None167        else:168            if isinstance(images, str):169                images = [images]170            self._images = list(map(str, images))171    @property172    def smtp_server(self):173        return self._smtp_server174    @smtp_server.setter175    def smtp_server(self, smtp_server):176        if smtp_server is None:177            self._smtp_server = None178        else:179            if not isinstance(smtp_server, str):180                smtp_server = str(smtp_server)181            self._smtp_server = smtp_server182    @property183    def smtp_password(self):184        return self._smtp_password185    @smtp_password.setter186    def smtp_password(self, smtp_password):187        if smtp_password is None:188            self._smtp_password = None189        else:190            if not isinstance(smtp_password, str):191                smtp_password = str(smtp_password)192            self._smtp_password = smtp_password193    @property194    def smtp_username(self):195        if self._smtp_username:196            return self._smtp_username197    @smtp_username.setter198    def smtp_username(self, smtp_username):199        if smtp_username is None:200            self._smtp_username = None201        else:202            if not isinstance(smtp_username, str):203                smtp_username = str(smtp_username)204            self._smtp_username = smtp_username205    def log(self, message):206        """ Very basic logging function. Should change in future """207        print(message)208    def error(self, message, exception=None):209        """ Very basic error function. Should change in future """210        self.log('ERROR: {}'.format(message))211        if exception:212            raise exception(message)213    def send(self):214        """ Validate settings, compose message, and send """215        self._validate()216        msg = self._compose()217        try:218            self._send(msg)219        except smtplib.SMTPException as e:220            self.error(str(e))221            raise(e)222    def _send(self, msg):223        """ Send email with current settings """224        with smtplib.SMTP(self.smtp_server, self.smtp_timeout) as smtp:225            smtp.starttls()226            smtp.login(self.smtp_username, self.smtp_password)227            text = msg.as_string()228            recipients = self.to229            if self.cc:230                recipients.extend(self.cc)231            if self.bcc:232                recipients.extend(self.bcc)233            smtp.sendmail(self.smtp_username, recipients, text)234    def _compose(self):235        """ Combine saved settings into a MIME multipart message """236        msg = MIMEMultipart('mixed')237        msg.preamble = 'This is a multi-part message in MIME format.'238        msg['To'] = ', '.join(self.to)239        if self.cc:240            msg['Cc'] = ', '.join(self.cc)241        if self.bcc:242            msg['Bcc'] = ', '.join(self.bcc)243        msg['From'] = self.smtp_username244        msg['Subject'] = self.subject245        msg.attach(MIMEText(self.body, self.body_type))246        attachments = self._make_attachments()247        for a in attachments:248            msg.attach(a)249        return msg250    def _validate(self):251        """ Check email has all required settings """252        if not self.to:253            self.error("To address is not set", EmailInvalidSettingsError)254        if not self.smtp_username:255            self.error("smtp username not set", EmailInvalidSettingsError)256        if not self.smtp_password:257            self.error("smtp password not set", EmailInvalidSettingsError)258        if not self.smtp_server:259            self.error("smtp server not set", EmailInvalidSettingsError)260        if not self.smtp_timeout:261            self.error("smtp timeout not set", EmailInvalidSettingsError)262        # Check all attachments exist263        for a in self.files + self.images:264            if not os.path.isfile(a):265                self.error(f"Cannot find file to attach: {a}", EmailInvalidSettingsError)266    def _make_attachments(self):267        """ Create attachment parts for files and images268        if self.images_inline is True, then inline html for the image269        is inserted270        returns a list of attachment parts271        """272        attachments = []273        for f in self.files:274            # Attach files as attachments275            try:276                with open(f, 'rb') as fp:277                    part = MIMEBase('application', 'octet-stream')278                    part.set_payload(fp.read())279            except IOError:280                self.error("Could not file for attachment: {}".format(f))281                continue282            encoders.encode_base64(part)283            part.add_header(284                'Content-Disposition',285                "attachment; filename={}".format(os.path.basename(f)))286            attachments.append(part)287        for i in self.images:288            # Attach images289            ibn = os.path.basename(i)290            cid = 'image_{}'.format(ibn.replace('.', '_'))291            try:292                with open(i, 'rb') as fp:293                    part = MIMEImage(fp.read(), name=ibn)294            except IOError:295                self.error("Could not image for attachment: {}".format(i))296                continue297            part.add_header('Content-ID', '<{}>'.format(cid))298            attachments.append(part)299            if self.images_inline:300                attachments.append(MIMEText(301                    u'<img src="cid:{}" alt="{}">'.format(cid, ibn),302                    'html', 'utf-8')303                )304        return attachments305def send_exception_email(exception, recipients=None, board=None,306                         subject=None, prepend_body=None, settings_file=None):307    settings_file = settings_file or DEFAULT_SETTINGS_FILE308    if not recipients:309        if not os.path.exists(settings_file):310            return311        with open(settings_file, 'r') as f:312            settings = json.load(f)313            if 'maintainers' not in settings:314                raise EmailInvalidSettingsError(315                    'recipents not given and "maintainers" not in settings file: {}'.format(316                        settings_file))317            recipients = settings['maintainers']318    email = Email(319        to=recipients,320        files=[],321        body='',322        body_type='html'323    )324    error_info = {325        'exception': exception.__class__.__name__,326        'cause': str(exception),327        'time': datetime.now().strftime('%d-%m-%y %H:%M:%S'),328        'trace': traceback.format_exc()329    }330    if prepend_body:331        email.body += '{}<br><hr><br>'.format(prepend_body)332    email.body += '''333        <b>Exception:</b> {}<br>334        <b>Cause:</b> {}<br>335        <b>Time:</b> {}<br>336        <b>Trace:</b> {}<br>337        <hr>338        '''.format(339        error_info['exception'],340        error_info['cause'],341        error_info['time'],342        '<br>'.join(error_info['trace'].split('\n')))343    email.body += '''344        <b>Platform: </b>{}<br><hr>345        '''.format('<br>'.join(list(str(platform.uname()).split(','))))346    if subject:347        email.subject = subject348    else:349        email.subject = 'Unhandled Exception Occured: [{}]'.format(350            error_info['exception'])351        if board:352            email.subject = ('{} [{}]'.format(email.subject, board.name))353    if board:354        email.body += '<b>Board Info:</b><br>'355        email.body += '<br>'.join(board.show_hier().split('\n'))356        email.body += '<hr><br>'357        email.files.append(board.log_file)358        if board.console and board.console.log_file:359            email.files.append(board.console.log_file)360        board.log(email.subject, color='red', bold=True)361        board.log(str(error_info), color='red', bold=True)362        board.log('Informing lab maintainers via email')...loop_node.py
Source:loop_node.py  
...43        self.invariants.insert(0, invariant)44    def append_invariants(self, invariants: List[Expr]) -> None:45        """Append ``invariants`` to the invariants list."""46        self.invariants.extend(invariants)47    def prepend_body(self, statements: List[Stmt]) -> None:48        """Prepend ``statements`` to body."""49        self.body[0:0] = statements50    def append_body(self, statement: Stmt) -> None:51        """Append ``statement`` to body."""52        self.body.append(statement)53class ObligationLoopNodeConstructor(StatementNodeConstructorBase):54    """A class that creates a while loop node with obligation stuff."""55    def __init__(56            self, obligation_loop: ObligationLoop,57            node: Union[ast.While, ast.For],58            translator: 'AbstractTranslator', ctx: Context,59            obligation_manager: ObligationManager) -> None:60        position = translator.to_position(node, ctx)61        info = translator.no_info(ctx)62        super().__init__(63            translator, ctx, obligation_manager, position, info, node)64        self._obligation_loop = obligation_loop65        self._node = node66    def get_statements(self) -> List[Stmt]:67        """Get all generated statements."""68        return self._statements69    def construct_loop(self) -> None:70        """Construct statements to perform a loop."""71        self._add_current_wait_level()72        self._add_additional_invariants()73        self._add_leak_check()74        self._set_up_measures()75        self._bound_obligations()76        self._save_must_terminate_amount(77            self._loop_obligation_info.original_must_terminate_var)78        self._save_loop_termination()79        self._set_loop_check_before()80        self._set_loop_check_after_body()81        self._check_loop_preserves_termination()82        self._prepend_loop_body()83        self._add_loop()84        self._add_additional_statements_after_loop()85        self._reset_must_terminate(86            self._loop_obligation_info.original_must_terminate_var)87    def _add_current_wait_level(self) -> None:88        """Inhale assumptions about current wait-level variable."""89        if obligation_config.disable_waitlevel_check:90            return91        context_info = self._ctx.obligation_context.get_surrounding_loop_info()92        if context_info:93            context_residue_level = context_info.residue_level94        else:95            method_info = self._ctx.current_function.obligation_info96            context_residue_level = method_info.residue_level97        invariant = self._translator.initialize_current_wait_level(98            sil.PermVar(self._loop_obligation_info.residue_level),99            sil.PermVar(context_residue_level),100            self._ctx)101        translated = invariant.translate(102            self._translator, self._ctx, self._position, self._info)103        self._obligation_loop.prepend_invariant(translated)104    def _add_additional_invariants(self) -> None:105        """Add additional invariants from the obligation info."""106        self._obligation_loop.append_invariants(107            self._loop_obligation_info.get_additional_invariants())108    def _add_leak_check(self) -> None:109        """Add leak checks to invariant."""110        reference_name = self._ctx.actual_function.get_fresh_name('_r')111        leak_check = self._obligation_manager.create_leak_check(reference_name)112        loop_check_before = sil.BoolVar(113            self._loop_obligation_info.loop_check_before_var)114        termination_flag = sil.BoolVar(115            self._loop_obligation_info.termination_flag_var)116        if not obligation_config.disable_loop_context_leak_check:117            must_terminate = self._obligation_manager.must_terminate_obligation118            predicate = must_terminate.create_predicate_access(119                self._obligation_info.current_thread_var)120            termination_leak_check = sil.CurrentPerm(predicate) == sil.NoPerm()121            loop_cond = self._loop_obligation_info.construct_loop_condition()122            before_loop_leak_check = sil.InhaleExhale(123                sil.TrueLit(),124                sil.Implies(125                    loop_check_before,126                    sil.BigOr([127                        termination_flag,128                        sil.Not(loop_cond),129                        sil.BigAnd([termination_leak_check, leak_check])130                    ])131                )132            )133            info = self._to_info('Leak check for context.')134            position = self._to_position(135                conversion_rules=rules.OBLIGATION_LOOP_CONTEXT_LEAK_CHECK_FAIL)136            self._obligation_loop.append_invariants([137                before_loop_leak_check.translate(138                    self._translator, self._ctx, position, info)])139        if not obligation_config.disable_loop_body_leak_check:140            body_leak_check = sil.InhaleExhale(141                sil.TrueLit(),142                sil.Implies(sil.Not(loop_check_before), leak_check))143            info = self._to_info('Leak check for loop body.')144            position = self._to_position(145                conversion_rules=rules.OBLIGATION_LOOP_BODY_LEAK_CHECK_FAIL)146            self._obligation_loop.append_invariants([147                body_leak_check.translate(148                    self._translator, self._ctx, position, info)])149    def _set_up_measures(self) -> None:150        """Create and initialize loop's measure map."""151        if obligation_config.disable_measures:152            return153        # Set up measures.154        loop_measure_map = self._loop_obligation_info.loop_measure_map155        instances = self._loop_obligation_info.get_all_instances()156        statements = loop_measure_map.initialize(157            instances, self._translator, self._ctx)158        self._obligation_loop.prepend_body(statements)159    def _bound_obligations(self) -> None:160        """Convert all unbounded obligations to bounded ones."""161        statements = bound_obligations(162            self._loop_obligation_info.get_all_instances(),163            self._translator, self._ctx, self._position, self._info)164        self._obligation_loop.prepend_body(statements)165    def _save_loop_termination(self) -> None:166        """Save if loop promises to terminate into a variable."""167        if obligation_config.disable_termination_check:168            return169        assign = sil.Assign(170            self._loop_obligation_info.termination_flag_var,171            self._loop_obligation_info.create_termination_check(True))172        info = self._to_info('Save loop termination promise.')173        self._append_statement(assign, info=info)174    def _set_loop_check_before(self) -> None:175        """Set the variable indicating that we are before loop."""176        if obligation_config.disable_all:177            return178        assign = sil.Assign(179            self._loop_obligation_info.loop_check_before_var,180            sil.TrueLit())181        info = self._to_info('We are before loop.')182        self._append_statement(assign, info=info)183    def _set_loop_check_after_body(self) -> None:184        """Set the variable indicating that we are after loop body."""185        if obligation_config.disable_all:186            return187        assign = sil.Assign(188            self._loop_obligation_info.loop_check_before_var,189            sil.FalseLit())190        info = self._to_info('We are after loop body.')191        statement = assign.translate(192            self._translator, self._ctx, self._position, info)193        self._obligation_loop.append_body(statement)194    def _check_loop_preserves_termination(self) -> None:195        """Check that loop keeps the promise to terminate."""196        if obligation_config.disable_termination_check:197            return198        check = sil.Implies(199            sil.BoolVar(self._loop_obligation_info.termination_flag_var),200            sil.BigOr([201                sil.Not(self._loop_obligation_info.construct_loop_condition()),202                self._loop_obligation_info.create_termination_check(False)203            ])204        )205        assertion = sil.Assert(check)206        position = self._to_position(207            conversion_rules=rules.OBLIGATION_LOOP_TERMINATION_PROMISE_FAIL)208        comment = 'Check if loop continues to terminate.'209        if isinstance(self._viper, ViperASTExtended):210            info = self._viper.SIFInfo([comment], continue_unaware=True)211        else:212            info = self._to_info(comment)213        statement = assertion.translate(214            self._translator, self._ctx, position, info)215        self._obligation_loop.append_body(statement)216    def _prepend_loop_body(self) -> None:217        """Add additional statements before loop from the obligation info."""218        self._obligation_loop.prepend_body(219            self._loop_obligation_info.get_prepend_body())220    def _add_loop(self) -> None:221        """Add the actual loop node."""222        body_block = self._translator.translate_block(223            self._obligation_loop.body, self._position, self._info)224        loop = self._viper.While(225            self._obligation_loop.condition, self._obligation_loop.invariants,226            self._obligation_loop.local_vars, body_block,227            self._position, self._info)228        self._statements.append(loop)229    def _add_additional_statements_after_loop(self) -> None:230        """Add additional statements after loop from the obligation info."""231        self._statements.extend(232            self._loop_obligation_info.get_after_loop())233    @property...test_add_error.py
Source:test_add_error.py  
1from uuid import uuid42import pytest3import logbook4@pytest.mark.parametrize('failure_type', ['failure', 'error'])5@pytest.mark.parametrize('use_custom_message', [True, False])6def test_add_error_custom_exc_info(suite, suite_test, failure_type, use_custom_message):7    message = str(uuid4())8    if use_custom_message:9        add_error_args = '{!r}, '.format(message)10    else:11        add_error_args = ''12    add_error_args += 'exc_info=exc_info'13    code = """14import sys15try:16    1/017except ZeroDivisionError:18    exc_info = sys.exc_info()19    try:20        None.a = 221    except AttributeError:22        slash.add_{0}({1})23    """.format(failure_type, add_error_args)24    for line in code.strip().splitlines():25        suite_test.append_line(line)26    if failure_type == 'error':27        suite_test.expect_error()28    else:29        suite_test.expect_failure()30    summary = suite.run()31    [result] = summary.get_all_results_for_test(suite_test)32    if failure_type == 'error':33        [err] = result.get_errors()34    else:35        [err] = result.get_failures()36    assert err.has_custom_message() == use_custom_message37    if use_custom_message:38        assert err.message == message39    assert err.exception_type is ZeroDivisionError40    assert err.traceback.frames41def test_add_error_that_forbids_setattr(suite, suite_test):42    @suite_test.append_body43    def __code__(): # pylint: disable=unused-variable44        class MyException(Exception):45            def __setattr__(self, *args):46                raise Exception("Set-attr")47        raise MyException("Special Message")48    suite_test.expect_error()49    summary = suite.run()50    [result] = summary.get_all_results_for_test(suite_test)51    [err] = result.get_errors()52    assert "Special Message" in err.message  # err.message contains the full repr53    assert err.exception_type.__name__ == "MyException"54def test_add_fatal_error(suite, suite_test):55    @suite_test.append_body56    def __code__():             # pylint: disable=unused-variable57        slash.add_error('bla').mark_fatal() # pylint: disable=undefined-variable58    suite_test.expect_error()59    for test in suite.iter_all_after(suite_test):60        test.expect_not_run()61    session = suite.run().session62    assert session.results.has_fatal_errors()63def test_session_level_add_error_message(suite, suite_test):64    @suite_test.file.append_body65    def __code__():                       # pylint: disable=unused-variable66        @slash.hooks.session_end.register # pylint: disable=undefined-variable67        def _callback():68            slash.add_error('session: add_error') # pylint: disable=undefined-variable69    res = suite.run(expect_session_errors=True)70    errors = res.session.results.global_result.get_errors()71    assert len(errors) == 172    [err] = errors73    assert err.message == 'session: add_error'74@pytest.mark.parametrize('log_variables', [True, False])75def test_add_error_log_traceback_variables(suite, suite_test, log_variables, config_override, tmpdir):76    config_override('log.core_log_level', logbook.TRACE)77    config_override('log.traceback_variables', log_variables)78    config_override('log.root', str(tmpdir.join('logs')))79    @suite_test.prepend_body80    def __code__():          # pylint: disable=unused-variable81        # to avoid the line itself from being detected82        x_variable = 'x' * 3 # pylint: disable=unused-variable83        class Object(object):84            def __init__(self):85                self.property_value = 'yyy'86        self = Object() # pylint: disable=unused-variable87    suite_test.when_run.error()88    res = suite.run()89    result = res[suite_test]90    with open(result.get_log_path()) as f:91        lines = f.readlines()92    def _search_variable(variable_name, variable_value):93        found = False94        for line in lines:95            if variable_name in line and variable_value in line:96                found = True97                break98        assert found == log_variables, 'Variable {!r} not found in traceback log!'.format(variable_name)99    _search_variable('x_variable', 'xxx')100    _search_variable('self.property_value', 'yyy')101def test_add_error_log_traceback_variables_self_none(suite, suite_test, config_override, tmpdir):102    config_override('log.core_log_level', logbook.TRACE)103    config_override('log.traceback_variables', True)104    config_override('log.root', str(tmpdir.join('logs')))105    @suite_test.prepend_body106    def __code__():          # pylint: disable=unused-variable107        # to avoid the line itself from being detected108        self = None # pylint: disable=unused-variable109    suite_test.when_run.error()110    res = suite.run()111    result = res[suite_test]112    with open(result.get_log_path()) as f:113        lines = f.read()114    assert 'self: None' in lines115    [err] = result.get_errors()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
