How to use prepend_body method in Slash

Best Python code snippet using slash

email.py

Source:email.py Github

copy

Full Screen

1import os2import json3import traceback4import platform5from datetime import datetime6import smtplib7from email.mime.base import MIMEBase8from email.mime.multipart import MIMEMultipart9from email.mime.image import MIMEImage10from email.mime.text import MIMEText11from email import encoders12DEFAULT_SETTINGS_FILE = os.path.join(13 os.path.dirname(os.path.realpath(__file__)), os.pardir, 'email_settings.json')14class EmailError(Exception):15 pass16class EmailInvalidSettingsError(EmailError):17 pass18class Email():19 """ Create and send emails """20 def __init__(self,21 to=[],22 cc=[],23 bcc=[],24 subject=None,25 body=None,26 body_type='plain',27 files=[],28 images=[],29 images_inline=False,30 smtp_server=None,31 smtp_timeout=None,32 settings_file=None,33 smtp_password=None,34 smtp_username=None35 ):36 self.to = to37 self.cc = cc38 self.bcc = bcc39 self.subject = subject40 self.body = body41 self.body_type = body_type42 self.files = files43 self.images = images44 self.images_inline = images_inline45 self.smtp_password = None46 self.smtp_username = None47 self.smtp_server = None48 self.smtp_timeout = None49 if settings_file and not os.path.isfile(settings_file):50 self.error(f'Cannot open settings file {settings_file}')51 if not settings_file and os.path.isfile(DEFAULT_SETTINGS_FILE):52 settings_file = DEFAULT_SETTINGS_FILE53 if settings_file:54 self.load_settings_file(settings_file)55 # Priorities are: Arguments, file options56 if smtp_password:57 self.smtp_password = smtp_password58 if smtp_username:59 self.smtp_username = smtp_username60 # SMTP Username is also used as send sender email address61 # Priorities are: Arguments, file options, default options62 self.smtp_server = smtp_server or self.smtp_server or 'smtp.office365.com'63 self.smtp_timeout = smtp_timeout or self.smtp_timeout or 58764 def load_settings_file(self, file):65 self.log(f"Attempting to load email settings from file {file}")66 with open(file, 'r') as f:67 settings = json.load(f)68 # Required settings69 if 'smtp' not in settings:70 raise EmailInvalidSettingsError(f'No SMTP settings in file: {file}')71 if 'email' not in settings['smtp']:72 raise EmailInvalidSettingsError(f'No SMTP email in settings file: {file}')73 if 'password' not in settings['smtp']:74 raise EmailInvalidSettingsError(f'No SMTP password in settings file: {file}')75 self.smtp_username = settings['smtp']['email']76 self.smtp_password = settings['smtp']['password']77 # Optional settings78 if 'server' in settings['smtp']:79 self.smtp_server = settings['smtp']['server']80 if 'timeout' in settings['smtp']:81 self.smtp_timeout = settings['smtp']['timeout']82 # Property setters for data sanitation83 @property84 def cc(self):85 return self._cc86 @cc.setter87 def cc(self, cc):88 if cc is None:89 self._cc = None90 else:91 if isinstance(cc, str):92 cc = [cc]93 self._cc = list(map(str, cc))94 @property95 def bcc(self):96 return self._bcc97 @bcc.setter98 def bcc(self, bcc):99 if bcc is None:100 self._bcc = None101 else:102 if isinstance(bcc, str):103 bcc = [bcc]104 self._bcc = list(map(str, bcc))105 @property106 def to(self):107 return self._to108 @to.setter109 def to(self, to):110 if to is None:111 self._to = None112 else:113 if isinstance(to, str):114 to = [to]115 self._to = list(map(str, to))116 @property117 def body(self):118 return self._body119 @body.setter120 def body(self, body):121 if body is None:122 self._body = None123 else:124 if not isinstance(body, str):125 body = str(body)126 self._body = body127 @property128 def body_type(self):129 return self._body_type130 @body_type.setter131 def body_type(self, body_type):132 if body_type is None:133 self._body_type = None134 else:135 if not isinstance(body_type, str):136 body_type = str(body_type)137 self._body_type = body_type138 @property139 def subject(self):140 return self._subject141 @subject.setter142 def subject(self, subject):143 if subject is None:144 self._subject = None145 else:146 if not isinstance(subject, str):147 subject = str(subject)148 self._subject = subject149 @property150 def files(self):151 return self._files152 @files.setter153 def files(self, files):154 if files is None:155 self._files = None156 else:157 if isinstance(files, str):158 files = [files]159 self._files = list(map(str, files))160 @property161 def images(self):162 return self._images163 @images.setter164 def images(self, images):165 if images is None:166 self._images = None167 else:168 if isinstance(images, str):169 images = [images]170 self._images = list(map(str, images))171 @property172 def smtp_server(self):173 return self._smtp_server174 @smtp_server.setter175 def smtp_server(self, smtp_server):176 if smtp_server is None:177 self._smtp_server = None178 else:179 if not isinstance(smtp_server, str):180 smtp_server = str(smtp_server)181 self._smtp_server = smtp_server182 @property183 def smtp_password(self):184 return self._smtp_password185 @smtp_password.setter186 def smtp_password(self, smtp_password):187 if smtp_password is None:188 self._smtp_password = None189 else:190 if not isinstance(smtp_password, str):191 smtp_password = str(smtp_password)192 self._smtp_password = smtp_password193 @property194 def smtp_username(self):195 if self._smtp_username:196 return self._smtp_username197 @smtp_username.setter198 def smtp_username(self, smtp_username):199 if smtp_username is None:200 self._smtp_username = None201 else:202 if not isinstance(smtp_username, str):203 smtp_username = str(smtp_username)204 self._smtp_username = smtp_username205 def log(self, message):206 """ Very basic logging function. Should change in future """207 print(message)208 def error(self, message, exception=None):209 """ Very basic error function. Should change in future """210 self.log('ERROR: {}'.format(message))211 if exception:212 raise exception(message)213 def send(self):214 """ Validate settings, compose message, and send """215 self._validate()216 msg = self._compose()217 try:218 self._send(msg)219 except smtplib.SMTPException as e:220 self.error(str(e))221 raise(e)222 def _send(self, msg):223 """ Send email with current settings """224 with smtplib.SMTP(self.smtp_server, self.smtp_timeout) as smtp:225 smtp.starttls()226 smtp.login(self.smtp_username, self.smtp_password)227 text = msg.as_string()228 recipients = self.to229 if self.cc:230 recipients.extend(self.cc)231 if self.bcc:232 recipients.extend(self.bcc)233 smtp.sendmail(self.smtp_username, recipients, text)234 def _compose(self):235 """ Combine saved settings into a MIME multipart message """236 msg = MIMEMultipart('mixed')237 msg.preamble = 'This is a multi-part message in MIME format.'238 msg['To'] = ', '.join(self.to)239 if self.cc:240 msg['Cc'] = ', '.join(self.cc)241 if self.bcc:242 msg['Bcc'] = ', '.join(self.bcc)243 msg['From'] = self.smtp_username244 msg['Subject'] = self.subject245 msg.attach(MIMEText(self.body, self.body_type))246 attachments = self._make_attachments()247 for a in attachments:248 msg.attach(a)249 return msg250 def _validate(self):251 """ Check email has all required settings """252 if not self.to:253 self.error("To address is not set", EmailInvalidSettingsError)254 if not self.smtp_username:255 self.error("smtp username not set", EmailInvalidSettingsError)256 if not self.smtp_password:257 self.error("smtp password not set", EmailInvalidSettingsError)258 if not self.smtp_server:259 self.error("smtp server not set", EmailInvalidSettingsError)260 if not self.smtp_timeout:261 self.error("smtp timeout not set", EmailInvalidSettingsError)262 # Check all attachments exist263 for a in self.files + self.images:264 if not os.path.isfile(a):265 self.error(f"Cannot find file to attach: {a}", EmailInvalidSettingsError)266 def _make_attachments(self):267 """ Create attachment parts for files and images268 if self.images_inline is True, then inline html for the image269 is inserted270 returns a list of attachment parts271 """272 attachments = []273 for f in self.files:274 # Attach files as attachments275 try:276 with open(f, 'rb') as fp:277 part = MIMEBase('application', 'octet-stream')278 part.set_payload(fp.read())279 except IOError:280 self.error("Could not file for attachment: {}".format(f))281 continue282 encoders.encode_base64(part)283 part.add_header(284 'Content-Disposition',285 "attachment; filename={}".format(os.path.basename(f)))286 attachments.append(part)287 for i in self.images:288 # Attach images289 ibn = os.path.basename(i)290 cid = 'image_{}'.format(ibn.replace('.', '_'))291 try:292 with open(i, 'rb') as fp:293 part = MIMEImage(fp.read(), name=ibn)294 except IOError:295 self.error("Could not image for attachment: {}".format(i))296 continue297 part.add_header('Content-ID', '<{}>'.format(cid))298 attachments.append(part)299 if self.images_inline:300 attachments.append(MIMEText(301 u'<img src="cid:{}" alt="{}">'.format(cid, ibn),302 'html', 'utf-8')303 )304 return attachments305def send_exception_email(exception, recipients=None, board=None,306 subject=None, prepend_body=None, settings_file=None):307 settings_file = settings_file or DEFAULT_SETTINGS_FILE308 if not recipients:309 if not os.path.exists(settings_file):310 return311 with open(settings_file, 'r') as f:312 settings = json.load(f)313 if 'maintainers' not in settings:314 raise EmailInvalidSettingsError(315 'recipents not given and "maintainers" not in settings file: {}'.format(316 settings_file))317 recipients = settings['maintainers']318 email = Email(319 to=recipients,320 files=[],321 body='',322 body_type='html'323 )324 error_info = {325 'exception': exception.__class__.__name__,326 'cause': str(exception),327 'time': datetime.now().strftime('%d-%m-%y %H:%M:%S'),328 'trace': traceback.format_exc()329 }330 if prepend_body:331 email.body += '{}<br><hr><br>'.format(prepend_body)332 email.body += '''333 <b>Exception:</b> {}<br>334 <b>Cause:</b> {}<br>335 <b>Time:</b> {}<br>336 <b>Trace:</b> {}<br>337 <hr>338 '''.format(339 error_info['exception'],340 error_info['cause'],341 error_info['time'],342 '<br>'.join(error_info['trace'].split('\n')))343 email.body += '''344 <b>Platform: </b>{}<br><hr>345 '''.format('<br>'.join(list(str(platform.uname()).split(','))))346 if subject:347 email.subject = subject348 else:349 email.subject = 'Unhandled Exception Occured: [{}]'.format(350 error_info['exception'])351 if board:352 email.subject = ('{} [{}]'.format(email.subject, board.name))353 if board:354 email.body += '<b>Board Info:</b><br>'355 email.body += '<br>'.join(board.show_hier().split('\n'))356 email.body += '<hr><br>'357 email.files.append(board.log_file)358 if board.console and board.console.log_file:359 email.files.append(board.console.log_file)360 board.log(email.subject, color='red', bold=True)361 board.log(str(error_info), color='red', bold=True)362 board.log('Informing lab maintainers via email')...

Full Screen

Full Screen

loop_node.py

Source:loop_node.py Github

copy

Full Screen

...43 self.invariants.insert(0, invariant)44 def append_invariants(self, invariants: List[Expr]) -> None:45 """Append ``invariants`` to the invariants list."""46 self.invariants.extend(invariants)47 def prepend_body(self, statements: List[Stmt]) -> None:48 """Prepend ``statements`` to body."""49 self.body[0:0] = statements50 def append_body(self, statement: Stmt) -> None:51 """Append ``statement`` to body."""52 self.body.append(statement)53class ObligationLoopNodeConstructor(StatementNodeConstructorBase):54 """A class that creates a while loop node with obligation stuff."""55 def __init__(56 self, obligation_loop: ObligationLoop,57 node: Union[ast.While, ast.For],58 translator: 'AbstractTranslator', ctx: Context,59 obligation_manager: ObligationManager) -> None:60 position = translator.to_position(node, ctx)61 info = translator.no_info(ctx)62 super().__init__(63 translator, ctx, obligation_manager, position, info, node)64 self._obligation_loop = obligation_loop65 self._node = node66 def get_statements(self) -> List[Stmt]:67 """Get all generated statements."""68 return self._statements69 def construct_loop(self) -> None:70 """Construct statements to perform a loop."""71 self._add_current_wait_level()72 self._add_additional_invariants()73 self._add_leak_check()74 self._set_up_measures()75 self._bound_obligations()76 self._save_must_terminate_amount(77 self._loop_obligation_info.original_must_terminate_var)78 self._save_loop_termination()79 self._set_loop_check_before()80 self._set_loop_check_after_body()81 self._check_loop_preserves_termination()82 self._prepend_loop_body()83 self._add_loop()84 self._add_additional_statements_after_loop()85 self._reset_must_terminate(86 self._loop_obligation_info.original_must_terminate_var)87 def _add_current_wait_level(self) -> None:88 """Inhale assumptions about current wait-level variable."""89 if obligation_config.disable_waitlevel_check:90 return91 context_info = self._ctx.obligation_context.get_surrounding_loop_info()92 if context_info:93 context_residue_level = context_info.residue_level94 else:95 method_info = self._ctx.current_function.obligation_info96 context_residue_level = method_info.residue_level97 invariant = self._translator.initialize_current_wait_level(98 sil.PermVar(self._loop_obligation_info.residue_level),99 sil.PermVar(context_residue_level),100 self._ctx)101 translated = invariant.translate(102 self._translator, self._ctx, self._position, self._info)103 self._obligation_loop.prepend_invariant(translated)104 def _add_additional_invariants(self) -> None:105 """Add additional invariants from the obligation info."""106 self._obligation_loop.append_invariants(107 self._loop_obligation_info.get_additional_invariants())108 def _add_leak_check(self) -> None:109 """Add leak checks to invariant."""110 reference_name = self._ctx.actual_function.get_fresh_name('_r')111 leak_check = self._obligation_manager.create_leak_check(reference_name)112 loop_check_before = sil.BoolVar(113 self._loop_obligation_info.loop_check_before_var)114 termination_flag = sil.BoolVar(115 self._loop_obligation_info.termination_flag_var)116 if not obligation_config.disable_loop_context_leak_check:117 must_terminate = self._obligation_manager.must_terminate_obligation118 predicate = must_terminate.create_predicate_access(119 self._obligation_info.current_thread_var)120 termination_leak_check = sil.CurrentPerm(predicate) == sil.NoPerm()121 loop_cond = self._loop_obligation_info.construct_loop_condition()122 before_loop_leak_check = sil.InhaleExhale(123 sil.TrueLit(),124 sil.Implies(125 loop_check_before,126 sil.BigOr([127 termination_flag,128 sil.Not(loop_cond),129 sil.BigAnd([termination_leak_check, leak_check])130 ])131 )132 )133 info = self._to_info('Leak check for context.')134 position = self._to_position(135 conversion_rules=rules.OBLIGATION_LOOP_CONTEXT_LEAK_CHECK_FAIL)136 self._obligation_loop.append_invariants([137 before_loop_leak_check.translate(138 self._translator, self._ctx, position, info)])139 if not obligation_config.disable_loop_body_leak_check:140 body_leak_check = sil.InhaleExhale(141 sil.TrueLit(),142 sil.Implies(sil.Not(loop_check_before), leak_check))143 info = self._to_info('Leak check for loop body.')144 position = self._to_position(145 conversion_rules=rules.OBLIGATION_LOOP_BODY_LEAK_CHECK_FAIL)146 self._obligation_loop.append_invariants([147 body_leak_check.translate(148 self._translator, self._ctx, position, info)])149 def _set_up_measures(self) -> None:150 """Create and initialize loop's measure map."""151 if obligation_config.disable_measures:152 return153 # Set up measures.154 loop_measure_map = self._loop_obligation_info.loop_measure_map155 instances = self._loop_obligation_info.get_all_instances()156 statements = loop_measure_map.initialize(157 instances, self._translator, self._ctx)158 self._obligation_loop.prepend_body(statements)159 def _bound_obligations(self) -> None:160 """Convert all unbounded obligations to bounded ones."""161 statements = bound_obligations(162 self._loop_obligation_info.get_all_instances(),163 self._translator, self._ctx, self._position, self._info)164 self._obligation_loop.prepend_body(statements)165 def _save_loop_termination(self) -> None:166 """Save if loop promises to terminate into a variable."""167 if obligation_config.disable_termination_check:168 return169 assign = sil.Assign(170 self._loop_obligation_info.termination_flag_var,171 self._loop_obligation_info.create_termination_check(True))172 info = self._to_info('Save loop termination promise.')173 self._append_statement(assign, info=info)174 def _set_loop_check_before(self) -> None:175 """Set the variable indicating that we are before loop."""176 if obligation_config.disable_all:177 return178 assign = sil.Assign(179 self._loop_obligation_info.loop_check_before_var,180 sil.TrueLit())181 info = self._to_info('We are before loop.')182 self._append_statement(assign, info=info)183 def _set_loop_check_after_body(self) -> None:184 """Set the variable indicating that we are after loop body."""185 if obligation_config.disable_all:186 return187 assign = sil.Assign(188 self._loop_obligation_info.loop_check_before_var,189 sil.FalseLit())190 info = self._to_info('We are after loop body.')191 statement = assign.translate(192 self._translator, self._ctx, self._position, info)193 self._obligation_loop.append_body(statement)194 def _check_loop_preserves_termination(self) -> None:195 """Check that loop keeps the promise to terminate."""196 if obligation_config.disable_termination_check:197 return198 check = sil.Implies(199 sil.BoolVar(self._loop_obligation_info.termination_flag_var),200 sil.BigOr([201 sil.Not(self._loop_obligation_info.construct_loop_condition()),202 self._loop_obligation_info.create_termination_check(False)203 ])204 )205 assertion = sil.Assert(check)206 position = self._to_position(207 conversion_rules=rules.OBLIGATION_LOOP_TERMINATION_PROMISE_FAIL)208 comment = 'Check if loop continues to terminate.'209 if isinstance(self._viper, ViperASTExtended):210 info = self._viper.SIFInfo([comment], continue_unaware=True)211 else:212 info = self._to_info(comment)213 statement = assertion.translate(214 self._translator, self._ctx, position, info)215 self._obligation_loop.append_body(statement)216 def _prepend_loop_body(self) -> None:217 """Add additional statements before loop from the obligation info."""218 self._obligation_loop.prepend_body(219 self._loop_obligation_info.get_prepend_body())220 def _add_loop(self) -> None:221 """Add the actual loop node."""222 body_block = self._translator.translate_block(223 self._obligation_loop.body, self._position, self._info)224 loop = self._viper.While(225 self._obligation_loop.condition, self._obligation_loop.invariants,226 self._obligation_loop.local_vars, body_block,227 self._position, self._info)228 self._statements.append(loop)229 def _add_additional_statements_after_loop(self) -> None:230 """Add additional statements after loop from the obligation info."""231 self._statements.extend(232 self._loop_obligation_info.get_after_loop())233 @property...

Full Screen

Full Screen

test_add_error.py

Source:test_add_error.py Github

copy

Full Screen

1from uuid import uuid42import pytest3import logbook4@pytest.mark.parametrize('failure_type', ['failure', 'error'])5@pytest.mark.parametrize('use_custom_message', [True, False])6def test_add_error_custom_exc_info(suite, suite_test, failure_type, use_custom_message):7 message = str(uuid4())8 if use_custom_message:9 add_error_args = '{!r}, '.format(message)10 else:11 add_error_args = ''12 add_error_args += 'exc_info=exc_info'13 code = """14import sys15try:16 1/017except ZeroDivisionError:18 exc_info = sys.exc_info()19 try:20 None.a = 221 except AttributeError:22 slash.add_{0}({1})23 """.format(failure_type, add_error_args)24 for line in code.strip().splitlines():25 suite_test.append_line(line)26 if failure_type == 'error':27 suite_test.expect_error()28 else:29 suite_test.expect_failure()30 summary = suite.run()31 [result] = summary.get_all_results_for_test(suite_test)32 if failure_type == 'error':33 [err] = result.get_errors()34 else:35 [err] = result.get_failures()36 assert err.has_custom_message() == use_custom_message37 if use_custom_message:38 assert err.message == message39 assert err.exception_type is ZeroDivisionError40 assert err.traceback.frames41def test_add_error_that_forbids_setattr(suite, suite_test):42 @suite_test.append_body43 def __code__(): # pylint: disable=unused-variable44 class MyException(Exception):45 def __setattr__(self, *args):46 raise Exception("Set-attr")47 raise MyException("Special Message")48 suite_test.expect_error()49 summary = suite.run()50 [result] = summary.get_all_results_for_test(suite_test)51 [err] = result.get_errors()52 assert "Special Message" in err.message # err.message contains the full repr53 assert err.exception_type.__name__ == "MyException"54def test_add_fatal_error(suite, suite_test):55 @suite_test.append_body56 def __code__(): # pylint: disable=unused-variable57 slash.add_error('bla').mark_fatal() # pylint: disable=undefined-variable58 suite_test.expect_error()59 for test in suite.iter_all_after(suite_test):60 test.expect_not_run()61 session = suite.run().session62 assert session.results.has_fatal_errors()63def test_session_level_add_error_message(suite, suite_test):64 @suite_test.file.append_body65 def __code__(): # pylint: disable=unused-variable66 @slash.hooks.session_end.register # pylint: disable=undefined-variable67 def _callback():68 slash.add_error('session: add_error') # pylint: disable=undefined-variable69 res = suite.run(expect_session_errors=True)70 errors = res.session.results.global_result.get_errors()71 assert len(errors) == 172 [err] = errors73 assert err.message == 'session: add_error'74@pytest.mark.parametrize('log_variables', [True, False])75def test_add_error_log_traceback_variables(suite, suite_test, log_variables, config_override, tmpdir):76 config_override('log.core_log_level', logbook.TRACE)77 config_override('log.traceback_variables', log_variables)78 config_override('log.root', str(tmpdir.join('logs')))79 @suite_test.prepend_body80 def __code__(): # pylint: disable=unused-variable81 # to avoid the line itself from being detected82 x_variable = 'x' * 3 # pylint: disable=unused-variable83 class Object(object):84 def __init__(self):85 self.property_value = 'yyy'86 self = Object() # pylint: disable=unused-variable87 suite_test.when_run.error()88 res = suite.run()89 result = res[suite_test]90 with open(result.get_log_path()) as f:91 lines = f.readlines()92 def _search_variable(variable_name, variable_value):93 found = False94 for line in lines:95 if variable_name in line and variable_value in line:96 found = True97 break98 assert found == log_variables, 'Variable {!r} not found in traceback log!'.format(variable_name)99 _search_variable('x_variable', 'xxx')100 _search_variable('self.property_value', 'yyy')101def test_add_error_log_traceback_variables_self_none(suite, suite_test, config_override, tmpdir):102 config_override('log.core_log_level', logbook.TRACE)103 config_override('log.traceback_variables', True)104 config_override('log.root', str(tmpdir.join('logs')))105 @suite_test.prepend_body106 def __code__(): # pylint: disable=unused-variable107 # to avoid the line itself from being detected108 self = None # pylint: disable=unused-variable109 suite_test.when_run.error()110 res = suite.run()111 result = res[suite_test]112 with open(result.get_log_path()) as f:113 lines = f.read()114 assert 'self: None' in lines115 [err] = result.get_errors()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful